[ASTERIXDB-3196][TX] Support atomic Txn with no WAL

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details:

- Prevent concurrent metadata catalog modification.
- Introduce atomic txn without WAL that will persist
  all records on commit or delete on abort.
- Compute-storage separation fixes.

Change-Id: Icfd034a4dc0b6464564e2129a166e4ceb0dc7b41
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17583
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 2ed1638..372cf69 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -154,7 +154,7 @@
         }
         if (checkpoints.isEmpty()) {
             LOGGER.warn("Couldn't find any checkpoint file for index {}. Content of dir are {}.", indexPath,
-                    ioManager.getMatchingFiles(indexPath, IoUtil.NO_OP_FILTER).toString());
+                    ioManager.list(indexPath, IoUtil.NO_OP_FILTER).toString());
             throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
         }
         checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -182,7 +182,7 @@
 
     private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException, HyracksDataException {
         List<IndexCheckpoint> checkpoints = new ArrayList<>();
-        final Collection<FileReference> checkpointFiles = ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+        final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
         if (!checkpointFiles.isEmpty()) {
             for (FileReference checkpointFile : checkpointFiles) {
                 try {
@@ -229,8 +229,7 @@
 
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
-            final Collection<FileReference> checkpointFiles =
-                    ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+            final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
             if (!checkpointFiles.isEmpty()) {
                 for (FileReference checkpointFile : checkpointFiles) {
                     if (getCheckpointIdFromFileName(checkpointFile) < (latestId - historyToKeep)) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index cc69bb1..eb2b05b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,7 +27,7 @@
     public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
 
     public static void main(String[] args) throws Exception {
-        //        CloudUtils.startS3CloudEnvironment();
+        CloudUtils.startS3CloudEnvironment();
         final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
         try {
             integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index c4390fa..38fdf56 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -70,20 +69,10 @@
         }
     }
 
-    static void setFinal(Field field, Object obj, Object newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(obj, newValue);
-    }
-
     @SuppressWarnings({ "rawtypes", "unchecked" })
     static void replaceMapEntry(Field field, Object obj, Object key, Object value)
             throws Exception, IllegalAccessException {
         field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
         Map map = (Map) field.get(obj);
         map.put(key, value);
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index f87adba..799da68 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -185,7 +185,8 @@
 
         // Add the remaining files that are not stored locally (if any)
         for (String cloudFile : cloudFiles) {
-            localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+            localFiles.add(new FileReference(dir.getDeviceHandle(),
+                    cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
         }
         return new HashSet<>(localFiles);
     }
@@ -244,12 +245,11 @@
         return super.doSyncRead(fHandle, offset, data);
     }
 
-    // TODO: We need to download this too
     @Override
     public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
         if (!fileRef.getFile().exists()) {
-            // TODO(htowaileb): if it does not exist, download (lazy)
-            // TODO(htowaileb): make sure downloading the file is synchronous since many can request it at the same time
+            IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE, FileSyncMode.METADATA_SYNC_DATA_SYNC);
+            fileRef = open.getFileReference();
         }
         return super.readAllBytes(fileRef);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..2704e64 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,6 +38,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -297,6 +299,31 @@
         return "Dataset (" + datasetID + "), Partition (" + partition + ")";
     }
 
+    public void deleteMemoryComponent() throws HyracksDataException {
+        Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+        ILSMIndex primaryLsmIndex = null;
+        for (ILSMIndex lsmIndex : indexes) {
+            if (lsmIndex.isPrimaryIndex()) {
+                if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                    LOGGER.info("Primary index on dataset {} and partition {} is empty... skipping delete",
+                            dsInfo.getDatasetID(), partition);
+                    return;
+                }
+                primaryLsmIndex = lsmIndex;
+                break;
+            }
+        }
+        Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + indexes);
+        idGenerator.refresh();
+        ILSMComponentId nextComponentId = idGenerator.getId();
+        Map<String, Object> flushMap = new HashMap<>();
+        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+        ILSMIndexAccessor accessor = primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.getOpContext().setParameters(flushMap);
+        accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+    }
+
     private boolean canSafelyFlush() {
         return numActiveOperations.get() == 0;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 940535f..34d7caa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
@@ -145,4 +147,16 @@
      * so that any resources held by the transaction may be released
      */
     void complete();
+
+    /**
+     * Acquires {@code lock} write lock and sets the transactions as a write transaction
+     * @param lock
+     */
+    void acquireExclusiveWriteLock(ReentrantLock lock);
+
+    /**
+     * Determines if this tx uses WAL
+     * @return true if this tx uses WAL. Otherwise, false.
+     */
+    boolean hasWAL();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 396d3f6..30c693d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -43,6 +43,10 @@
          */
         ATOMIC,
         /**
+         * all records are committed and persisted to disk or nothing
+         */
+        ATOMIC_NO_WAL,
+        /**
          * any record with entity commit log
          */
         ENTITY_LEVEL
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2195f88..522fd32 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -116,6 +117,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import org.apache.asterix.transaction.management.opcallbacks.NoOpModificationOpCallback;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -161,6 +163,8 @@
     private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
     // extension only
     private Map<ExtensionMetadataDatasetId, ExtensionMetadataDataset<?>> extensionDatasets;
+    private final ReentrantLock metadataModificationLock = new ReentrantLock(true);
+    private boolean atomicNoWAL;
 
     public static final MetadataNode INSTANCE = new MetadataNode();
 
@@ -184,6 +188,7 @@
             }
         }
         this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
+        atomicNoWAL = runtimeContext.isCloudDeployment();
     }
 
     public int getMetadataStoragePartition() {
@@ -192,7 +197,8 @@
 
     @Override
     public void beginTransaction(TxnId transactionId) {
-        TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC);
+        AtomicityLevel lvl = atomicNoWAL ? AtomicityLevel.ATOMIC_NO_WAL : AtomicityLevel.ATOMIC;
+        TransactionOptions options = new TransactionOptions(lvl);
         transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options);
     }
 
@@ -501,11 +507,7 @@
         if (!force) {
             confirmFullTextFilterCanBeDeleted(txnId, dataverseName, filterName);
         }
-
         try {
-            FullTextFilterMetadataEntityTupleTranslator translator =
-                    tupleTranslatorProvider.getFullTextFilterTupleTranslator(true);
-
             ITupleReference key = createTuple(dataverseName.getCanonicalForm(), filterName);
             deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.FULL_TEXT_FILTER_DATASET, key);
         } catch (HyracksDataException e) {
@@ -612,7 +614,7 @@
             IModificationOperationCallback modCallback = createIndexModificationCallback(op, txnCtx, metadataIndex);
             IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-            txnCtx.setWriteTxn(true);
+            txnCtx.acquireExclusiveWriteLock(metadataModificationLock);
             txnCtx.register(metadataIndex.getResourceId(),
                     StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback,
                     metadataIndex.isPrimaryIndex());
@@ -640,6 +642,12 @@
         switch (indexOp) {
             case INSERT:
             case DELETE:
+                if (!txnCtx.hasWAL()) {
+                    return new NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+                            metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+                            transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
+                            ResourceType.LSM_BTREE, indexOp);
+                }
                 /*
                  * Regardless of the index type (primary or secondary index), secondary index modification
                  * callback is given. This is still correct since metadata index operation doesn't require
@@ -650,6 +658,12 @@
                         transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
                         ResourceType.LSM_BTREE, indexOp);
             case UPSERT:
+                if (!txnCtx.hasWAL()) {
+                    return new NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+                            metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+                            transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
+                            ResourceType.LSM_BTREE, indexOp);
+                }
                 return new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(),
                         txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem,
                         metadataIndex.getResourceId(), metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index a6ba0e2..11bac0d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -71,7 +71,7 @@
             // Get most recent sequence of existing files to avoid deletion
             FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
             Collection<FileReference> files =
-                    ioManager.getMatchingFiles(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+                    ioManager.list(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
             if (files == null) {
                 throw HyracksDataException
                         .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
new file mode 100644
index 0000000..ad77939
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class NoOpModificationOpCallback extends AbstractIndexModificationOperationCallback {
+
+    public NoOpModificationOpCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
+            byte resourceType, Operation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+                resourceType, indexOp);
+    }
+
+    @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        // no op
+    }
+
+    @Override
+    public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
+        // no op
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 25b9610..4e71b1c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -63,6 +63,7 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
@@ -240,7 +241,7 @@
             List<FileReference> roots) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (FileReference root : roots) {
-            final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+            final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
             try {
                 for (FileReference file : files) {
                     final LocalResource localResource = readLocalResource(file);
@@ -274,7 +275,7 @@
 
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
         for (FileReference root : storageRoots) {
-            final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+            final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
             try {
                 for (FileReference file : files) {
                     final LocalResource localResource = readLocalResource(file);
@@ -452,7 +453,7 @@
 
     private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException {
         final List<String> indexFiles = new ArrayList<>();
-        Collection<FileReference> indexFilteredFiles = ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+        Collection<FileReference> indexFilteredFiles = ioManager.list(indexDir, LSM_INDEX_FILES_FILTER);
         indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
         return indexFiles;
     }
@@ -492,8 +493,7 @@
 
     public synchronized void deleteCorruptedResources() throws HyracksDataException {
         for (FileReference root : storageRoots) {
-            final Collection<FileReference> metadataMaskFiles =
-                    ioManager.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
+            final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER);
             for (FileReference metadataMaskFile : metadataMaskFiles) {
                 final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
                 ioManager.delete(resourceFile);
@@ -503,7 +503,7 @@
     }
 
     private void deleteIndexMaskedFiles(FileReference index) throws IOException {
-        Collection<FileReference> masks = ioManager.getMatchingFiles(index, MASK_FILES_FILTER);
+        Collection<FileReference> masks = ioManager.list(index, MASK_FILES_FILTER);
         for (FileReference mask : masks) {
             deleteIndexMaskedFiles(index, mask);
             // delete the mask itself
@@ -520,7 +520,7 @@
     }
 
     private void deleteIndexInvalidComponents(FileReference index) throws IOException, ParseException {
-        final Collection<FileReference> indexComponentFiles = ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
+        final Collection<FileReference> indexComponentFiles = ioManager.list(index, COMPONENT_FILES_FILTER);
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error occurred");
         }
@@ -550,10 +550,10 @@
         Collection<FileReference> maskedFiles;
         if (isComponentMask(mask)) {
             final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
-            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.startsWith(componentId));
+            maskedFiles = ioManager.list(index, (dir, name) -> name.startsWith(componentId));
         } else {
             final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
-            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.equals(maskedFileName));
+            maskedFiles = ioManager.list(index, (dir, name) -> name.equals(maskedFileName));
         }
         if (maskedFiles != null) {
             for (FileReference maskedFile : maskedFiles) {
@@ -643,14 +643,11 @@
         }
     }
 
-    public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
+    public synchronized List<FileReference> getOnDiskPartitions() {
         List<FileReference> onDiskPartitions = new ArrayList<>();
         for (FileReference root : storageRoots) {
-            Collection<FileReference> partitions = ioManager.list(root, (dir, name) -> dir != null && dir.isDirectory()
-                    && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
-            if (partitions != null) {
-                onDiskPartitions.addAll(partitions);
-            }
+            onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory()
+                    && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
         }
         return onDiskPartitions;
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 104f9a7..8a1856c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -23,8 +23,10 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
@@ -42,6 +44,7 @@
     private final AtomicInteger txnState;
     private final AtomicBoolean isWriteTxn;
     private volatile boolean isTimeout;
+    private ReentrantLock exclusiveLock;
 
     protected AbstractTransactionContext(TxnId txnId) {
         this.txnId = txnId;
@@ -90,6 +93,21 @@
     }
 
     @Override
+    public void acquireExclusiveWriteLock(ReentrantLock exclusiveLock) {
+        if (isWriteTxn.get()) {
+            return;
+        }
+        try {
+            exclusiveLock.lockInterruptibly();
+            this.exclusiveLock = exclusiveLock;
+            setWriteTxn(true);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
     public void setWriteTxn(boolean isWriteTxn) {
         this.isWriteTxn.set(isWriteTxn);
     }
@@ -114,6 +132,9 @@
             synchronized (txnOpTrackers) {
                 txnOpTrackers.forEach((resource, opTracker) -> opTracker.afterTransaction(resource));
             }
+            if (exclusiveLock != null) {
+                exclusiveLock.unlock();
+            }
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
new file mode 100644
index 0000000..cfa39c6
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
+
+    public AtomicNoWALTransactionContext(TxnId txnId) {
+        super(txnId);
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        final int txnState = getTxnState();
+        switch (txnState) {
+            case ITransactionManager.ABORTED:
+                deleteUncommittedRecords();
+                break;
+            case ITransactionManager.COMMITTED:
+                ensureDurable();
+                break;
+            default:
+                throw new IllegalStateException("invalid state in txn clean up: " + getTxnState());
+        }
+    }
+
+    private void deleteUncommittedRecords() {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+            try {
+                primaryIndexOpTracker.deleteMemoryComponent();
+            } catch (HyracksDataException e) {
+                throw new ACIDException(e);
+            }
+        }
+    }
+
+    private void ensureDurable() {
+        List<FlushOperation> flushes = new ArrayList<>();
+        LogRecord dummyLogRecord = new LogRecord();
+        try {
+            for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+                PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+                primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
+                flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+            }
+            LSMIndexUtil.waitFor(flushes);
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public boolean hasWAL() {
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 083c26b..870a76b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,7 +18,10 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,6 +41,7 @@
     private final Map<Long, ILSMOperationTracker> opTrackers = new ConcurrentHashMap<>();
     private final Map<Long, AtomicInteger> indexPendingOps = new ConcurrentHashMap<>();
     private final Map<Long, IModificationOperationCallback> callbacks = new ConcurrentHashMap<>();
+    protected final Set<ILSMOperationTracker> modifiedIndexes = Collections.synchronizedSet(new HashSet<>());
 
     public AtomicTransactionContext(TxnId txnId) {
         super(txnId);
@@ -64,6 +68,7 @@
     @Override
     public void beforeOperation(long resourceId) {
         indexPendingOps.get(resourceId).incrementAndGet();
+        modifiedIndexes.add(opTrackers.get(resourceId));
     }
 
     @Override
@@ -110,4 +115,9 @@
         AtomicTransactionContext that = (AtomicTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public boolean hasWAL() {
+        return true;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index be5874e..2e5f6dd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -113,4 +113,9 @@
         EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public boolean hasWAL() {
+        return true;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 4a465a4..1076086 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -34,6 +34,8 @@
         switch (atomicityLevel) {
             case ATOMIC:
                 return new AtomicTransactionContext(txnId);
+            case ATOMIC_NO_WAL:
+                return new AtomicNoWALTransactionContext(txnId);
             case ENTITY_LEVEL:
                 return new EntityLevelTransactionContext(txnId);
             default:
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ee65962..8bbfbfd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -81,9 +81,11 @@
         final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = new LogRecord();
-                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true);
-                txnSubsystem.getLogManager().log(logRecord);
+                if (txnCtx.hasWAL()) {
+                    LogRecord logRecord = new LogRecord();
+                    TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true);
+                    txnSubsystem.getLogManager().log(logRecord);
+                }
                 txnCtx.setTxnState(ITransactionManager.COMMITTED);
             }
         } catch (Exception e) {
@@ -103,13 +105,15 @@
         final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
-                    LogRecord logRecord = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
-                    txnSubsystem.getLogManager().log(logRecord);
-                    txnSubsystem.getCheckpointManager().secure(txnId);
+                if (txnCtx.hasWAL()) {
+                    if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
+                        LogRecord logRecord = new LogRecord();
+                        TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+                        txnSubsystem.getLogManager().log(logRecord);
+                        txnSubsystem.getCheckpointManager().secure(txnId);
+                    }
+                    txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 }
-                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 txnCtx.setTxnState(ITransactionManager.ABORTED);
             }
         } catch (HyracksDataException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 63226ac..10f1637 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -23,7 +23,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
@@ -125,6 +124,13 @@
 
     Set<FileReference> list(FileReference dir) throws HyracksDataException;
 
+    /**
+     * Lists the files matching {@code filter} recursively starting from {@code dir}
+     * @param dir
+     * @param filter
+     * @return the matching files
+     * @throws HyracksDataException
+     */
     Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException;
 
     void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException;
@@ -136,9 +142,6 @@
 
     void deleteDirectory(FileReference root) throws HyracksDataException;
 
-    // TODO: Remove and use list
-    Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
-
     boolean exists(FileReference fileRef) throws HyracksDataException;
 
     void create(FileReference fileRef) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 7644a30..05e6544 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -29,8 +29,11 @@
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -194,4 +197,18 @@
     public static String getFileNameFromPath(String path) {
         return path.substring(path.lastIndexOf('/') + 1);
     }
+
+    public static Collection<FileReference> getMatchingChildren(FileReference root, FilenameFilter filter) {
+        if (!root.getFile().isDirectory()) {
+            throw new IllegalArgumentException("Parameter 'root' is not a directory: " + root);
+        }
+        Objects.requireNonNull(filter);
+        List<FileReference> files = new ArrayList<>();
+        String[] matchingFiles = root.getFile().list(filter);
+        if (matchingFiles != null) {
+            files.addAll(Arrays.stream(matchingFiles).map(pDir -> new FileReference(root.getDeviceHandle(), pDir))
+                    .collect(Collectors.toList()));
+        }
+        return files;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 19fbff3..1733ad4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -517,27 +517,13 @@
 
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        /*
-         * Throws an error if this abstract pathname does not denote a directory, or if an I/O error occurs.
-         * Returns an empty set if the file does not exist, otherwise, returns the files in the specified directory
-         */
         Set<FileReference> listedFiles = new HashSet<>();
         if (!dir.getFile().exists()) {
             return listedFiles;
         }
-
-        String[] files = dir.getFile().list(filter);
-        if (files == null) {
-            if (!dir.getFile().canRead()) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
-            } else if (!dir.getFile().isDirectory()) {
-                throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
-            }
-            throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
-        }
-
-        for (String file : files) {
-            listedFiles.add(dir.getChild(file));
+        Collection<File> files = IoUtil.getMatchingFiles(dir.getFile().toPath(), filter);
+        for (File file : files) {
+            listedFiles.add(resolveAbsolutePath(file.getAbsolutePath()));
         }
         return listedFiles;
     }
@@ -579,23 +565,6 @@
     }
 
     @Override
-    public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
-            throws HyracksDataException {
-        File rootFile = root.getFile();
-        if (!rootFile.exists() || !rootFile.isDirectory()) {
-            return Collections.emptyList();
-        }
-
-        Collection<File> files = IoUtil.getMatchingFiles(rootFile.toPath(), filter);
-        Set<FileReference> fileReferences = new HashSet<>();
-        for (File file : files) {
-            fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
-        }
-
-        return fileReferences;
-    }
-
-    @Override
     public boolean exists(FileReference fileRef) throws HyracksDataException {
         return fileRef.getFile().exists();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 45bfed1..1c0d7b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -23,7 +23,6 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.common.IIndex;
@@ -101,7 +100,7 @@
                     LOGGER.warn(
                             "Deleting {} on index create. The index is not registered but the file exists in the filesystem",
                             resolvedResourceRef);
-                    IoUtil.delete(resolvedResourceRef);
+                    ctx.getIoManager().delete(resolvedResourceRef);
                 }
                 index = resource.createInstance(ctx);
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 6a4f10d..1bceea3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -42,8 +42,8 @@
 
     protected final BufferCache bufferCache;
     protected final IPageReplacementStrategy pageReplacementStrategy;
+    protected final IOManager ioManager;
     private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
-    private final IOManager ioManager;
 
     private IFileHandle fileHandle;
     private volatile boolean hasOpen;
@@ -193,7 +193,7 @@
         }
     }
 
-    public static void deleteFile(FileReference fileRef) throws HyracksDataException {
+    public static void deleteFile(FileReference fileRef, IIOManager ioManager) throws HyracksDataException {
         HyracksDataException savedEx = null;
 
         /*
@@ -206,7 +206,7 @@
                 final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
                 final FileReference lafFileRef = cFileRef.getLAFFileReference();
                 if (lafFileRef.getFile().exists()) {
-                    IoUtil.delete(lafFileRef);
+                    ioManager.delete(lafFileRef);
                 }
             }
         } catch (HyracksDataException e) {
@@ -214,7 +214,7 @@
         }
 
         try {
-            IoUtil.delete(fileRef);
+            ioManager.delete(fileRef);
         } catch (HyracksDataException e) {
             if (savedEx != null) {
                 savedEx.addSuppressed(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 70500e5..10284f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -43,7 +43,6 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
@@ -761,7 +760,7 @@
         } catch (Exception e) {
             // If file registration failed for any reason, we need to undo the file creation
             try {
-                IoUtil.delete(fileRef);
+                ioManager.delete(fileRef);
             } catch (Exception deleteException) {
                 e.addSuppressed(deleteException);
             }
@@ -960,7 +959,7 @@
         if (mapped) {
             deleteFile(fileId);
         } else {
-            BufferedFileHandle.deleteFile(fileRef);
+            BufferedFileHandle.deleteFile(fileRef, ioManager);
         }
     }
 
@@ -991,7 +990,7 @@
                         fInfo.markAsDeleted();
                     }
                 } finally {
-                    BufferedFileHandle.deleteFile(fileRef);
+                    BufferedFileHandle.deleteFile(fileRef, ioManager);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
index 7d0cc62..d78ac24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -23,6 +23,7 @@
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
@@ -64,6 +65,7 @@
     private final IBufferCache bufferCache;
     private final ICompressorDecompressor compressorDecompressor;
     private final CompressedFileReference fileRef;
+    private final IIOManager ioManager;
 
     private int fileId;
     private State state;
@@ -71,12 +73,13 @@
 
     private LAFWriter lafWriter;
 
-    public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef) {
+    public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef, IIOManager ioManager) {
         state = State.CLOSED;
         totalNumOfPages = 0;
         this.bufferCache = bufferCache;
         this.fileRef = fileRef;
         this.compressorDecompressor = fileRef.getCompressorDecompressor();
+        this.ioManager = ioManager;
     }
 
     /* ************************
@@ -99,7 +102,7 @@
         ensureState(CLOSED);
 
         boolean open = false;
-        if (fileRef.getLAFFileReference().getFile().exists()) {
+        if (ioManager.exists(fileRef.getLAFFileReference())) {
             fileId = bufferCache.openFile(fileRef.getLAFFileReference());
             open = true;
         } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index b6178c5..01811c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -157,7 +157,7 @@
     @Override
     public void open(FileReference fileRef) throws HyracksDataException {
         final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
-        compressedFileManager = new CompressedFileManager(bufferCache, cFileRef);
+        compressedFileManager = new CompressedFileManager(bufferCache, cFileRef, ioManager);
         compressedFileManager.open();
         super.open(fileRef);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
index ddb5717..de8bdc8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.storage.am.common;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.util.Log4j2Monitor;
 import org.apache.logging.log4j.Level;
@@ -97,7 +96,6 @@
         index.destroy();
         Assert.assertFalse(persistentStateExists());
         index.destroy();
-        Assert.assertTrue(Log4j2Monitor.count(IoUtil.FILE_NOT_FOUND_MSG) > 0);
         Assert.assertFalse(persistentStateExists());
     }