fix issue 97

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging_issue_97@3328 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index d4d6c89..d3314b5 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -49,6 +49,41 @@
 				</configuration>
 			</plugin>
 		</plugins>
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.codehaus.mojo
+										</groupId>
+										<artifactId>
+											javacc-maven-plugin
+										</artifactId>
+										<versionRange>
+											[2.6,)
+										</versionRange>
+										<goals>
+											<goal>javacc</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
 	</build>
 	<dependencies>
 		<dependency>
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 655cd12..2847004 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -75,7 +75,7 @@
 
 public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
 
-    // In-memory components.   
+    // In-memory components.
     private final LSMBTreeMutableComponent mutableComponent;
 
     // For creating BTree's used in flush and merge.
@@ -263,6 +263,7 @@
                 ctx.memBTreeAccessor.upsert(tuple);
                 break;
         }
+        mutableComponent.setIsModified();
     }
 
     private boolean insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, IndexException {
@@ -290,8 +291,12 @@
             memCursor.close();
         }
 
-        // TODO: Can we just remove the above code that search the mutable component and do it together with the search call below? i.e. instead of passing false to the lsmHarness.search(), we pass true to include the mutable component?
-        // the key was not in the inmemory component, so check the disk components
+        // TODO: Can we just remove the above code that search the mutable
+        // component and do it together with the search call below? i.e. instead
+        // of passing false to the lsmHarness.search(), we pass true to include
+        // the mutable component?
+        // the key was not in the inmemory component, so check the disk
+        // components
         search(ctx, searchCursor, predicate);
         try {
             if (searchCursor.hasNext()) {
@@ -321,8 +326,11 @@
     }
 
     @Override
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+    public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException {
+        if (!mutableComponent.isModified()) {
+            return false;
+        }
         LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
         LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         assert ctx.getComponentHolder().size() == 1;
@@ -332,6 +340,7 @@
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
                 .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
+        return true;
     }
 
     @Override
@@ -474,7 +483,8 @@
 
     @Override
     public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
-        // The order of forcing the dirty page to be flushed is critical. The bloom filter must be always done first.
+        // The order of forcing the dirty page to be flushed is critical. The
+        // bloom filter must be always done first.
         LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
         // Flush the bloom filter first.
         int fileId = component.getBloomFilter().getFileId();
@@ -635,6 +645,6 @@
         for (ILSMComponent c : immutableComponents) {
             BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
             btree.validate();
-		}
-	}
+        }
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
index 30e79b4..d340599 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
@@ -41,6 +41,7 @@
 
     @Override
     protected void reset() throws HyracksDataException {
+        super.reset();
         btree.clear();
     }
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index cff47bb..e8c09c0 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -38,7 +38,7 @@
     public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback);
 
-    public boolean getFlushStatus(ILSMIndex index);
+    public boolean getFlushStatus();
 
     public ILSMOperationTracker getOperationTracker();
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index e98165b..f2c052b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -36,7 +36,7 @@
     public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
             throws HyracksDataException, IndexException;
 
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+    public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException;
 
     public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
@@ -65,6 +65,6 @@
 
     public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
 
-    public void setFlushStatus(ILSMIndex index, boolean needsFlush);
+    public void setFlushStatus(boolean needsFlush);
 
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 610e1c7..e4dd369 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -168,12 +167,12 @@
     }
 
     @Override
-    public void setFlushStatus(ILSMIndex index, boolean needsFlush) {
+    public void setFlushStatus(boolean needsFlush) {
         this.needsFlush = needsFlush;
     }
 
     @Override
-    public boolean getFlushStatus(ILSMIndex index) {
+    public boolean getFlushStatus() {
         return needsFlush;
     }
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
index 1a6636a..5cde7bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
@@ -8,6 +8,8 @@
     private int readerCount;
     private int writerCount;
     private ComponentState state;
+    
+    private boolean isModified;
 
     private enum ComponentState {
         READABLE_WRITABLE,
@@ -20,6 +22,7 @@
         readerCount = 0;
         writerCount = 0;
         state = ComponentState.READABLE_WRITABLE;
+        isModified = false;
     }
 
     @Override
@@ -97,8 +100,19 @@
         }
         notifyAll();
     }
+    
+
+    public void setIsModified() {
+        isModified = true;
+    }
+
+    public boolean isModified() {
+        return isModified;
+    }
 
     protected abstract boolean isFull();
 
-    protected abstract void reset() throws HyracksDataException;
+    protected void reset() throws HyracksDataException {
+        isModified = false;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 4a140b4..65e6a3d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -45,8 +46,8 @@
     }
 
     private void threadExit(ILSMIndexOperationContext opCtx, LSMOperationType opType) throws HyracksDataException {
-        if (!lsmIndex.getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
-            lsmIndex.setFlushStatus(lsmIndex, true);
+        if (!lsmIndex.getFlushStatus() && lsmIndex.getInMemoryFreePageManager().isFull()) {
+            lsmIndex.setFlushStatus(true);
         }
         opTracker.afterOperation(opType, opCtx.getSearchOperationCallback(), opCtx.getModificationCallback());
     }
@@ -168,8 +169,15 @@
         if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
             return;
         }
-        lsmIndex.setFlushStatus(lsmIndex, false);
-        lsmIndex.scheduleFlush(ctx, callback);
+
+        lsmIndex.setFlushStatus(false);
+        
+        if (!lsmIndex.scheduleFlush(ctx, callback)) {
+            callback.beforeOperation();
+            callback.afterOperation(null, null);
+            callback.afterFinalize(null);
+            exitComponents(ctx, LSMOperationType.FLUSH, false);
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
index 7fee06e..5dfa94e 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
@@ -39,7 +39,7 @@
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Flush will only be handled by last exiting thread.
         if (opType == LSMOperationType.MODIFICATION) {
-            if (threadRefCount.decrementAndGet() == 0 && index.getFlushStatus(index)) {
+            if (threadRefCount.decrementAndGet() == 0 && index.getFlushStatus()) {
                 ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
                 accessor.scheduleFlush(NoOpIOOperationCallback.INSTANCE);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 34195ce..33052f3 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -393,8 +393,11 @@
     }
 
     @Override
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+    public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException {
+        if (!mutableComponent.isModified()) {
+            return false;
+        }
         LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
         LSMInvertedIndexOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
@@ -405,6 +408,7 @@
                 fileManager, opCtx), mutableComponent, componentFileRefs.getInsertIndexFileReference(),
                 componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
                 callback));
+        return true;
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
index c36319d..615f1ee 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
@@ -49,6 +49,7 @@
 
     @Override
     protected void reset() throws HyracksDataException {
+        super.reset();
         invIndex.clear();
         deletedKeysBTree.clear();
     }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 23137ab..9b377a9 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -319,6 +319,7 @@
                 // that all the corresponding insert tuples are deleted
             }
         }
+        mutableComponent.setIsModified();
     }
 
     protected LSMRTreeOpContext createOpContext(IModificationOperationCallback modCallback) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 3bffb43..fc5b06d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -206,8 +206,11 @@
     }
 
     @Override
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+    public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException {
+        if (!mutableComponent.isModified()) {
+            return false;
+        }
         LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
         ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
         LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) ctx.getComponentHolder().get(0);
@@ -217,6 +220,7 @@
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
                 .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
                 .getBloomFilterFileReference(), callback));
+        return true;
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
index 80f76a1..2586d58 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
@@ -48,9 +48,11 @@
 
     @Override
     protected void reset() throws HyracksDataException {
+        super.reset();
         rtree.clear();
         if (btree != null) {
             btree.clear();
         }
     }
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 478d076..be9073e 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -196,8 +196,11 @@
     }
 
     @Override
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+    public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException {
+        if (!mutableComponent.isModified()) {
+            return false;
+        }
         LSMRTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE);
         LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference();
         ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
@@ -206,6 +209,7 @@
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
                 .getInsertIndexFileReference(), null, null, callback));
+        return true;
     }
 
     @Override