Added the ability to collect profile. Added REST Api to provide access to job state

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@173 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 63cb572..753079b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -31,7 +31,6 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 
 abstract class AbstractHyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
@@ -95,7 +94,7 @@
     }
 
     @Override
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        return hci.waitForCompletion(jobId);
+    public void waitForCompletion(UUID jobId) throws Exception {
+        hci.waitForCompletion(jobId);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index b3c1148..a164fee 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 
 public interface IHyracksClientConnection {
     public void createApplication(String appName, File harFile) throws Exception;
@@ -36,5 +35,5 @@
 
     public void start(UUID jobId) throws Exception;
 
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+    public void waitForCompletion(UUID jobId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 0c0089f..c4bf0e6 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -20,7 +20,6 @@
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 
 public interface IHyracksClientInterface extends Remote {
     public ClusterControllerInfo getClusterControllerInfo() throws Exception;
@@ -37,5 +36,5 @@
 
     public void start(UUID jobId) throws Exception;
 
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+    public void waitForCompletion(UUID jobId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
index 2b78fbb..f577db2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
@@ -18,19 +18,17 @@
 import java.util.Map;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
 public interface IClusterController extends Remote {
     public NodeParameters registerNode(INodeController nodeController) throws Exception;
 
     public void unregisterNode(INodeController nodeController) throws Exception;
 
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-            StageletStatistics statistics) throws Exception;
+            Map<String, Long> statistics) throws Exception;
 
     public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
 
     public void nodeHeartbeat(String id) throws Exception;
 
-    public void reportProfile(String id, Map<String, Long> counterDump) throws Exception;
+    public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
deleted file mode 100644
index 8784b46..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-public class JobStatistics implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private Date startTime;
-
-    private Date endTime;
-
-    private List<StageStatistics> stages;
-
-    public JobStatistics() {
-        stages = new ArrayList<StageStatistics>();
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void addStageStatistics(StageStatistics stageStatistics) {
-        stages.add(stageStatistics);
-    }
-
-    public List<StageStatistics> getStages() {
-        return stages;
-    }
-
-    @Override
-    public String toString() {
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-        StringBuilder buffer = new StringBuilder();
-
-        buffer.append("{\n");
-        indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
-        indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
-        indent(buffer, 1).append("stages: [\n");
-        boolean first = true;
-        for (StageStatistics ss : stages) {
-            if (!first) {
-                buffer.append(",\n");
-            }
-            first = false;
-            ss.toString(buffer, 2);
-        }
-        buffer.append("\n");
-        indent(buffer, 1).append("]\n");
-        buffer.append("}");
-
-        return buffer.toString();
-    }
-
-    static StringBuilder indent(StringBuilder buffer, int level) {
-        for (int i = 0; i < level; ++i) {
-            buffer.append(" ");
-        }
-        return buffer;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java
deleted file mode 100644
index 6cc437b..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-public class StageStatistics implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private UUID stageId;
-
-    private Map<String, StageletStatistics> stageletMap;
-
-    public StageStatistics() {
-        stageletMap = new HashMap<String, StageletStatistics>();
-    }
-
-    public void setStageId(UUID stageId) {
-        this.stageId = stageId;
-    }
-
-    public UUID getStageId() {
-        return stageId;
-    }
-
-    public void addStageletStatistics(StageletStatistics ss) {
-        stageletMap.put(ss.getNodeId(), ss);
-    }
-
-    public Map<String, StageletStatistics> getStageletStatistics() {
-        return stageletMap;
-    }
-
-    public void toString(StringBuilder buffer, int level) {
-        JobStatistics.indent(buffer, level).append("{\n");
-        JobStatistics.indent(buffer, level + 1).append("stageId: '").append(stageId).append("',\n");
-        JobStatistics.indent(buffer, level + 1).append("stagelets: {\n");
-        boolean first = true;
-        for (Map.Entry<String, StageletStatistics> e : stageletMap.entrySet()) {
-            if (!first) {
-                buffer.append(",\n");
-            }
-            first = false;
-            JobStatistics.indent(buffer, level + 2).append(e.getKey()).append(": ");
-            e.getValue().toString(buffer, level + 3);
-        }
-        buffer.append("\n");
-        JobStatistics.indent(buffer, level + 1).append("}\n");
-        JobStatistics.indent(buffer, level).append("}");
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java
deleted file mode 100644
index 66fb871..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StageletStatistics implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String nodeId;
-
-    private Date startTime;
-
-    private Date endTime;
-
-    private Map<String, String> statisticsMap;
-
-    public StageletStatistics() {
-        statisticsMap = Collections.synchronizedMap(new TreeMap<String, String>());
-    }
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public Map<String, String> getStatisticsMap() {
-        return statisticsMap;
-    }
-
-    public void toString(StringBuilder buffer, int level) {
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-        buffer.append("{\n");
-        JobStatistics.indent(buffer, level + 1).append("nodeId: '").append(nodeId).append("',\n");
-        JobStatistics.indent(buffer, level + 1).append("startTime: '").append(df.format(startTime)).append("',\n");
-        JobStatistics.indent(buffer, level + 1).append("endTime: '").append(df.format(endTime)).append("',\n");
-        JobStatistics.indent(buffer, level + 1).append("statistics: {\n");
-        boolean first = true;
-        for (Map.Entry<String, String> e : statisticsMap.entrySet()) {
-            if (!first) {
-                buffer.append(",\n");
-            }
-            first = false;
-            JobStatistics.indent(buffer, level + 2).append(e.getKey()).append(": '").append(e.getValue()).append("'");
-        }
-        buffer.append("\n");
-        JobStatistics.indent(buffer, level + 1).append("}\n");
-        JobStatistics.indent(buffer, level).append("}");
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index acdbd2e..1956fb7 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -32,13 +32,6 @@
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
-  		<groupId>org.apache.wicket</groupId>
-  		<artifactId>wicket</artifactId>
-  		<version>1.4.7</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
   		<groupId>jol</groupId>
   		<artifactId>jol</artifactId>
   		<version>1.0.0</version>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 723d767..e411517 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,6 +24,7 @@
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +56,8 @@
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.server.handler.ContextHandler;
+import org.json.JSONArray;
+import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
@@ -71,9 +74,10 @@
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.IJSONOutputFunction;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.JSONOutputRequestHandler;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.RoutingHandler;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -117,6 +121,7 @@
         webServer = new WebServer();
         webServer.addHandler(getAdminConsoleHandler());
         webServer.addHandler(getApplicationInstallationHandler());
+        webServer.addHandler(getRestAPIHandler());
         this.timer = new Timer(true);
     }
 
@@ -197,14 +202,13 @@
                     } catch (Exception e) {
                     }
                 }
-
             });
         }
     }
 
     @Override
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-            StageletStatistics statistics) throws Exception {
+            Map<String, Long> statistics) throws Exception {
         jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
     }
 
@@ -224,14 +228,48 @@
     }
 
     @Override
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        return jobManager.waitForCompletion(jobId);
+    public void waitForCompletion(UUID jobId) throws Exception {
+        jobManager.waitForCompletion(jobId);
     }
 
     @Override
-    public void reportProfile(String id, Map<String, Long> counterDump) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO))
+    public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Profile: " + id + ": " + counterDump);
+        }
+        jobManager.reportProfile(id, counterDump);
+    }
+
+    private Handler getRestAPIHandler() {
+        ContextHandler handler = new ContextHandler("/state");
+        RoutingHandler rh = new RoutingHandler();
+        rh.addHandler("jobs", new JSONOutputRequestHandler(new IJSONOutputFunction() {
+            @Override
+            public JSONObject invoke(String[] arguments) throws Exception {
+                JSONObject result = new JSONObject();
+                switch (arguments.length) {
+                    case 1:
+                        if (!"".equals(arguments[0])) {
+                            break;
+                        }
+                    case 0:
+                        result.put("result", jobManager.getQueryInterface().getAllJobSummaries());
+                        break;
+
+                    case 2:
+                        UUID jobId = UUID.fromString(arguments[0]);
+
+                        if ("spec".equalsIgnoreCase(arguments[1])) {
+                            result.put("result", jobManager.getQueryInterface().getJobSpecification(jobId));
+                        } else if ("profile".equalsIgnoreCase(arguments[1])) {
+                            result.put("result", jobManager.getQueryInterface().getJobProfile(jobId));
+                        }
+                }
+                return result;
+            }
+        }));
+        handler.setHandler(rh);
+        return handler;
     }
 
     private Handler getAdminConsoleHandler() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index bb90d0b..1f14b3b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -15,13 +15,12 @@
 package edu.uci.ics.hyracks.control.cc;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public interface IJobManager {
     public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
@@ -29,15 +28,19 @@
     public void start(UUID jobId) throws Exception;
 
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-            StageletStatistics statistics) throws Exception;
+            Map<String, Long> statistics) throws Exception;
 
     public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
 
     public JobStatus getJobStatus(UUID jobId);
 
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+    public void waitForCompletion(UUID jobId) throws Exception;
 
     public void notifyNodeFailure(String nodeId) throws Exception;
 
     public void registerNode(String nodeId) throws Exception;
+
+    public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception;
+
+    public IJobManagerQueryInterface getQueryInterface();
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java
new file mode 100644
index 0000000..221fc10
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.UUID;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+public interface IJobManagerQueryInterface {
+    public JSONArray getAllJobSummaries() throws Exception;
+
+    public JSONObject getJobSpecification(UUID jobId) throws Exception;
+
+    public JSONObject getJobPlan(UUID jobId) throws Exception;
+
+    public JSONObject getJobProfile(UUID jobId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index 22cce67..9ed4f00 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -36,6 +36,10 @@
 import jol.types.table.Function;
 import jol.types.table.Key;
 import jol.types.table.TableName;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
 import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
@@ -56,9 +60,6 @@
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public class JOLJobManagerImpl implements IJobManager {
     private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -113,8 +114,12 @@
 
     private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
 
+    private final ProfileUpdateTable puTable;
+
     private final List<String> rankedAvailableNodes;
 
+    private final IJobManagerQueryInterface qi;
+
     public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
         this.jolRuntime = jolRuntime;
         jobQueue = new LinkedBlockingQueue<Runnable>();
@@ -141,6 +146,7 @@
         this.abortMessageTable = new AbortMessageTable(jolRuntime);
         this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
         this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+        this.puTable = new ProfileUpdateTable();
         this.rankedAvailableNodes = new ArrayList<String>();
 
         jolRuntime.catalog().register(jobTable);
@@ -163,6 +169,7 @@
         jolRuntime.catalog().register(abortMessageTable);
         jolRuntime.catalog().register(abortNotifyTable);
         jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+        jolRuntime.catalog().register(puTable);
 
         jobTable.register(new JobTable.Callback() {
             @Override
@@ -366,6 +373,8 @@
 
         jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
         jolRuntime.evaluate();
+
+        qi = new QueryInterfaceImpl();
     }
 
     @Override
@@ -476,6 +485,15 @@
     }
 
     @Override
+    public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception {
+        BasicTupleSet puTuples = new BasicTupleSet();
+        for (Map.Entry<UUID, Map<String, Long>> e : counterDump.entrySet()) {
+            puTuples.add(ProfileUpdateTable.createTuple(e.getKey(), id, e.getValue()));
+        }
+        jolRuntime.schedule(JOL_SCOPE, ProfileUpdateTable.TABLE_NAME, puTuples, null);
+    }
+
+    @Override
     public JobStatus getJobStatus(UUID jobId) {
         synchronized (jobTable) {
             try {
@@ -530,7 +548,7 @@
 
     @Override
     public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-            StageletStatistics statistics) throws Exception {
+            Map<String, Long> statistics) throws Exception {
         BasicTupleSet scTuples = new BasicTupleSet();
         scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
 
@@ -582,14 +600,31 @@
     }
 
     @Override
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+    public void waitForCompletion(UUID jobId) throws Exception {
         synchronized (jobTable) {
             Tuple jobTuple = null;
             while ((jobTuple = jobTable.lookupJob(jobId)) != null
                     && jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX) != JobStatus.TERMINATED) {
                 jobTable.wait();
             }
-            return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+        }
+    }
+
+    @Override
+    public IJobManagerQueryInterface getQueryInterface() {
+        return qi;
+    }
+
+    public void visitJobs(ITupleProcessor processor) throws Exception {
+        for (Tuple t : jobTable.tuples()) {
+            processor.process(t);
+        }
+    }
+
+    public void visitJob(UUID jobId, ITupleProcessor processor) throws Exception {
+        Tuple job = jobTable.lookupJob(jobId);
+        if (job != null) {
+            processor.process(job);
         }
     }
 
@@ -603,7 +638,7 @@
 
         @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, JobStatus.class,
-                JobSpecification.class, JobPlan.class, Set.class };
+                JobSpecification.class, JobPlan.class, Map.class };
 
         public static final int JOBID_FIELD_INDEX = 0;
         public static final int APPNAME_FIELD_INDEX = 1;
@@ -618,24 +653,7 @@
 
         @SuppressWarnings("unchecked")
         static Tuple createInitialJobTuple(UUID jobId, String appName, JobSpecification jobSpec, JobPlan plan) {
-            return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
-        }
-
-        @SuppressWarnings("unchecked")
-        JobStatistics buildJobStatistics(Tuple jobTuple) {
-            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple
-                    .value(JobTable.STATISTICS_FIELD_INDEX);
-            JobStatistics stats = new JobStatistics();
-            if (statsSet != null) {
-                for (Set<StageletStatistics> stageStatsSet : statsSet) {
-                    StageStatistics stageStats = new StageStatistics();
-                    for (StageletStatistics stageletStats : stageStatsSet) {
-                        stageStats.addStageletStatistics(stageletStats);
-                    }
-                    stats.addStageStatistics(stageStats);
-                }
-            }
-            return stats;
+            return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashMap());
         }
 
         Tuple lookupJob(UUID jobId) throws BadKeyException {
@@ -876,7 +894,7 @@
     }
 
     /*
-     * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
      */
     private static class StageletCompleteTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
@@ -885,14 +903,14 @@
 
         @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
-                StageletStatistics.class };
+                Map.class };
 
         public StageletCompleteTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
         public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
-                StageletStatistics statistics) {
+                Map<String, Long> statistics) {
             return new Tuple(jobId, stageId, nodeId, attempt, statistics);
         }
     }
@@ -1044,6 +1062,24 @@
         }
     }
 
+    /*
+     * declare(profileupdate, keys(0, 1), {JobId, NodeId, Map})
+     */
+    private static class ProfileUpdateTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "profileupdate");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, Map.class };
+
+        public ProfileUpdateTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, String nodeId, Map<String, Long> statistics) {
+            return new Tuple(jobId, nodeId, statistics);
+        }
+    }
+
     private class JobQueueThread extends Thread {
         public JobQueueThread() {
             setDaemon(true);
@@ -1065,4 +1101,78 @@
             }
         }
     }
+
+    public interface ITupleProcessor {
+        public void process(Tuple t) throws Exception;
+    }
+
+    private class QueryInterfaceImpl implements IJobManagerQueryInterface {
+        @Override
+        public JSONArray getAllJobSummaries() throws Exception {
+            final JSONArray jobs = new JSONArray();
+            JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+                @Override
+                public void process(Tuple t) throws Exception {
+                    JSONObject jo = new JSONObject();
+                    jo.put("type", "job-summary");
+                    jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString());
+                    jo.put("status", t.value(JobTable.JOBSTATUS_FIELD_INDEX).toString());
+                    jobs.put(jo);
+                }
+            };
+            visitJobs(tp);
+            return jobs;
+        }
+
+        @Override
+        public JSONObject getJobSpecification(UUID jobId) throws Exception {
+            final JSONArray jobs = new JSONArray();
+            JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+                @Override
+                public void process(Tuple t) throws Exception {
+                    JobSpecification js = (JobSpecification) t.value(JobTable.JOBSPEC_FIELD_INDEX);
+                    jobs.put(js.toJSON());
+                }
+            };
+            visitJob(jobId, tp);
+            return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+        }
+
+        @Override
+        public JSONObject getJobPlan(UUID jobId) throws Exception {
+            final JSONArray jobs = new JSONArray();
+            JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+                @Override
+                public void process(Tuple t) throws Exception {
+                }
+            };
+            visitJob(jobId, tp);
+            return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+        }
+
+        @Override
+        public JSONObject getJobProfile(UUID jobId) throws Exception {
+            final JSONArray jobs = new JSONArray();
+            JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+                @Override
+                public void process(Tuple t) throws Exception {
+                    JSONObject jo = new JSONObject();
+                    jo.put("type", "profile");
+                    jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString());
+                    Map<String, Long> profile = (Map<String, Long>) t.value(JobTable.STATISTICS_FIELD_INDEX);
+                    if (profile != null) {
+                        for (Map.Entry<String, Long> e : profile.entrySet()) {
+                            JSONObject jpe = new JSONObject();
+                            jpe.put("name", e.getKey());
+                            jpe.put("value", e.getValue());
+                            jo.accumulate("counters", jpe);
+                        }
+                    }
+                    jobs.put(jo);
+                }
+            };
+            visitJob(jobId, tp);
+            return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
index 438e235..102a011 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
@@ -18,20 +18,14 @@
 import java.util.Set;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-
 public class StageProgress {
     private final UUID stageId;
 
     private final Set<String> pendingNodes;
 
-    private final StageStatistics stageStatistics;
-
     public StageProgress(UUID stageId) {
         this.stageId = stageId;
         pendingNodes = new HashSet<String>();
-        stageStatistics = new StageStatistics();
-        stageStatistics.setStageId(stageId);
     }
 
     public UUID getStageId() {
@@ -49,8 +43,4 @@
     public boolean stageComplete() {
         return pendingNodes.isEmpty();
     }
-
-    public StageStatistics getStageStatistics() {
-        return stageStatistics;
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java
new file mode 100644
index 0000000..75edf2b
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import org.json.JSONObject;
+
+public interface IJSONOutputFunction {
+    public JSONObject invoke(String[] arguments) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java
new file mode 100644
index 0000000..c6b8d9c
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.json.JSONObject;
+
+public class JSONOutputRequestHandler extends AbstractHandler {
+    private final IJSONOutputFunction fn;
+
+    public JSONOutputRequestHandler(IJSONOutputFunction fn) {
+        this.fn = fn;
+    }
+
+    @Override
+    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+            throws IOException, ServletException {
+        while (target.startsWith("/")) {
+            target = target.substring(1);
+        }
+        while (target.endsWith("/")) {
+            target = target.substring(0, target.length() - 1);
+        }
+        String[] parts = target.split("/");
+        try {
+            JSONObject result = fn.invoke(parts);
+            response.setContentType("application/json");
+            result.write(response.getWriter());
+            baseRequest.setHandled(true);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java
new file mode 100644
index 0000000..6afaf2a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+public class RoutingHandler extends AbstractHandler {
+    private final Map<String, Handler> handlers;
+
+    public RoutingHandler() {
+        handlers = new HashMap<String, Handler>();
+    }
+
+    public synchronized void addHandler(String route, Handler handler) {
+        handlers.put(route, handler);
+    }
+
+    public synchronized void removeHandler(String route) {
+        handlers.remove(route);
+    }
+
+    @Override
+    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+            throws IOException, ServletException {
+        while (target.startsWith("/")) {
+            target = target.substring(1);
+        }
+        int idx = target.indexOf('/');
+        String path0 = target;
+        String rest = "";
+        if (idx >= 0) {
+            path0 = target.substring(0, idx);
+            rest = target.substring(idx);
+        }
+        Handler h;
+        synchronized (this) {
+            h = handlers.get(path0);
+        }
+        if (h != null) {
+            h.handle(rest, baseRequest, request, response);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
index 46188da..a717128 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -2,6 +2,8 @@
 
 import java.util.UUID;
 import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 
 import jol.types.basic.Tuple;
 import jol.types.basic.TupleSet;
@@ -95,16 +97,23 @@
         NextAttempt := Attempt + 1;
     };
 
+job(JobId, AppName, Status, JobSpec, JobPlan, Stats) :-
+    job(JobId, AppName, Status, JobSpec, JobPlan, Stats),
+    profileupdate(JobId, _, JobletStats)
+    {
+        Stats.putAll(JobletStats);
+    };
+
 define(stagestart, keys(), {UUID, Integer, Integer});
-define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Map});
 
 watch(jobstart, i);
 
 stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
     jobattempt#insert(JobId, Attempt);
 
-update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
-    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats) :-
+    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, Stats),
     jobstart(JobId, _);
 
 stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
@@ -287,22 +296,33 @@
     stageabort(JobId, StageId, Attempt, NodeIdSet2),
     NodeIdSet1.size() == NodeIdSet2.size();
 
-define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+define(stageletcompletecount, keys(0, 1, 2), {UUID, UUID, Integer, Integer});
 
-stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+stageletcompletecount(JobId, StageId, Attempt, count<NodeId>) :-
+    stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Map});
+
+stageletcomplete_agg(JobId, StageId, Attempt, generic<(new HashMap()).putAll(Statistics)>) :-
     stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
 
-stagefinish(JobId, StageNumber, Attempt, SSet) :-
+stagefinish(JobId, StageNumber, Attempt, SMap) :-
     startmessage_agg(JobId, StageId, Attempt, _, _, TSet),
-    stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+    stageletcompletecount(JobId, StageId, Attempt, Count),
+    stageletcomplete_agg(JobId, StageId, Attempt, SMap),
     jobstage(JobId, StageNumber, StageId),
-    TSet.size() == SSet.size();
+    TSet.size() == Count;
 
-update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
-    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, NewStats) :-
+    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats),
     stagestart#insert(JobId, StageNumber, Attempt),
-    stagefinish(JobId, _, Attempt, SSet),
-    notin jobstage(JobId, StageNumber);
+    stagefinish(JobId, _, Attempt, SMap),
+    notin jobstage(JobId, StageNumber)
+    {
+        NewStats := new HashMap();
+        NewStats.putAll(Stats);
+        NewStats.putAll(SMap);
+    };
 
 define(jobcleanup_agg, {UUID, Set});
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index f83a3c1..b48be8f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public class Joblet {
     private static final long serialVersionUID = 1L;
@@ -89,7 +88,7 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats)
+    public synchronized void notifyStageletComplete(UUID stageId, int attempt, Map<String, Long> stats)
             throws Exception {
         stageletMap.remove(stageId);
         nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d9c5cda..3690ed7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -31,8 +31,6 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -75,7 +73,6 @@
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -464,7 +461,7 @@
         }
     }
 
-    public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+    public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, Map<String, Long> stats) throws Exception {
         try {
             ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
         } catch (Exception e) {
@@ -519,21 +516,25 @@
         @Override
         public void run() {
             try {
-                Map<String, Long> counterDump = new TreeMap<String, Long>();
+                Map<UUID, Map<String, Long>> counterDump = new HashMap<UUID, Map<String, Long>>();
                 Set<UUID> jobIds;
                 synchronized (NodeControllerService.this) {
                     jobIds = new HashSet<UUID>(jobletMap.keySet());
                 }
-                for(UUID jobId : jobIds) {
+                for (UUID jobId : jobIds) {
                     Joblet ji;
                     synchronized (NodeControllerService.this) {
                         ji = jobletMap.get(jobId);
                     }
                     if (ji != null) {
-                        ji.dumpProfile(counterDump);
+                        Map<String, Long> jobletCounterDump = new HashMap<String, Long>();
+                        ji.dumpProfile(jobletCounterDump);
+                        counterDump.put(jobId, jobletCounterDump);
                     }
                 }
-                cc.reportProfile(id, counterDump);
+                if (!counterDump.isEmpty()) {
+                    cc.reportProfile(id, counterDump);
+                }
             } catch (Exception e) {
                 e.printStackTrace();
             }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index afbb95d..0f53f26 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -15,12 +15,12 @@
 package edu.uci.ics.hyracks.control.nc;
 
 import java.rmi.RemoteException;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.control.nc.job.profiling.CounterContext;
 import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
 
@@ -55,8 +54,6 @@
 
     private final Set<OperatorInstanceId> pendingOperators;
 
-    private final StageletStatistics stats;
-
     public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
         this.joblet = joblet;
         this.stageId = stageId;
@@ -65,8 +62,6 @@
         started = false;
         honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
         stageletCounterContext = new CounterContext(joblet.getJobId() + "." + stageId + "." + nodeId);
-        stats = new StageletStatistics();
-        stats.setNodeId(nodeId);
     }
 
     public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
@@ -94,7 +89,6 @@
             throw new Exception("Joblet already started");
         }
         started = true;
-        stats.setStartTime(new Date());
         notifyAll();
     }
 
@@ -145,8 +139,9 @@
     protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
         pendingOperators.remove(opIId);
         if (pendingOperators.isEmpty()) {
-            stats.setEndTime(new Date());
             try {
+                Map<String, Long> stats = new TreeMap<String, Long>();
+                dumpProfile(stats);
                 joblet.notifyStageletComplete(stageId, attempt, stats);
             } catch (Exception e) {
                 e.printStackTrace();
@@ -169,10 +164,6 @@
         }
     }
 
-    public StageletStatistics getStatistics() {
-        return stats;
-    }
-
     public void dumpProfile(Map<String, Long> counterDump) {
         stageletCounterContext.dump(counterDump);
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index c0b6d34..858ff6c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -42,6 +42,7 @@
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.port = 39001;
+        ccConfig.profileDumpPeriod = 1000;
         ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
@@ -72,12 +73,12 @@
         nc1.stop();
         cc.stop();
     }
-    
+
     protected void runTest(JobSpecification spec) throws Exception {
         UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         System.err.println(spec.toJSON());
         hcc.start(jobId);
         System.err.print(jobId);
-        System.err.println(cc.waitForCompletion(jobId));
+        cc.waitForCompletion(jobId);
     }
 }
\ No newline at end of file