Merge fullstack_hyracks_result_distribution from r3260:r3272 into fullstack_asterix_stabilization.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3273 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 371169f..52e6005 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -17,9 +17,10 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetDirectoryService {
+public interface IDatasetDirectoryService extends IJobLifecycleListener {
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
             int nPartitions, NetworkAddress networkAddress);
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 05365d3..506a870 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -170,7 +170,7 @@
             }
         };
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService();
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
         jobCounter = 0;
     }
 
@@ -205,6 +205,7 @@
 
     private void startApplication() throws Exception {
         appCtx = new CCApplicationContext(serverCtx, ccContext);
+        appCtx.addJobLifecycleListener(datasetDirectoryService);
         String className = ccConfig.appCCMainClass;
         if (className != null) {
             Class<?> c = Class.forName(className);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 13ae426..cdcdf4c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -15,7 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.dataset;
 
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -26,6 +26,8 @@
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 /**
@@ -38,14 +40,39 @@
 public class DatasetDirectoryService implements IDatasetDirectoryService {
     private final Map<JobId, DatasetJobRecord> jobResultLocations;
 
-    public DatasetDirectoryService() {
-        jobResultLocations = new HashMap<JobId, DatasetJobRecord>();
+    public DatasetDirectoryService(final int jobHistorySize) {
+        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
+            private static final long serialVersionUID = 1L;
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
+                return size() > jobHistorySize;
+            }
+        };
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        if (djr == null) {
+            djr = new DatasetJobRecord();
+            jobResultLocations.put(jobId, djr);
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+        // Auto-generated method stub
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId) throws HyracksException {
+        // Auto-generated method stub
     }
 
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
@@ -84,14 +111,14 @@
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
         djr.fail();
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
         djr.fail();
         notifyAll();
     }
@@ -158,7 +185,11 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+
+        if (djr == null) {
+            throw new HyracksDataException("Requested JobId " + jobId + "doesn't exist");
+        }
 
         if (djr.getStatus() == Status.FAILED) {
             throw new HyracksDataException("Job failed.");
@@ -201,13 +232,4 @@
         }
         return null;
     }
-
-    private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
-        if (djr == null) {
-            djr = new DatasetJobRecord();
-            jobResultLocations.put(jobId, djr);
-        }
-        return djr;
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index d1577bc..ec29592 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -51,6 +51,9 @@
     @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
     public int maxMemory = -1;
 
+    @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
+    public int resultHistorySize = 100;
+
     @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
     public int resultManagerMemory = -1;
 
@@ -79,10 +82,12 @@
         cList.add(String.valueOf(nNetThreads));
         cList.add("-max-memory");
         cList.add(String.valueOf(maxMemory));
+        cList.add("-result-history-size");
+        cList.add(String.valueOf(resultHistorySize));
         cList.add("-result-manager-memory");
         cList.add(String.valueOf(resultManagerMemory));
 
-       if (appNCMainClass != null) {
+        if (appNCMainClass != null) {
             cList.add("-app-nc-main-class");
             cList.add(appNCMainClass);
         }
@@ -92,5 +97,5 @@
                 cList.add(appArg);
             }
         }
-   }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 10e0dba..e15c60e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -150,7 +150,8 @@
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
 
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+                ncConfig.resultHistorySize);
         datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
                 datasetPartitionManager, ncConfig.nNetThreads);
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 1cad54b..1e58b5c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -44,13 +44,26 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
-    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
+            final int resultHistorySize) {
         this.ncs = ncs;
         this.executor = executor;
-        partitionResultStateMap = new HashMap<JobId, ResultState[]>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
         datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+        partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+            private static final long serialVersionUID = 1L;
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
+                if (size() > resultHistorySize) {
+                    for (ResultState state : eldest.getValue()) {
+                        state.deinit();
+                    }
+                    return true;
+                }
+                return false;
+            }
+        };
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 3db3fd9..0f1d94c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -64,6 +64,10 @@
         notifyAll();
     }
 
+    public synchronized void deinit() {
+        fileRef.delete();
+    }
+
     public ResultSetPartitionId getResultSetPartitionId() {
         return resultSetPartitionId;
     }