Merge "Merge branch 'trinity' into 'master'"
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index bb93fbc..9d94327 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -114,6 +114,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Provides helper methods for compilation of a query into a JobSpec and submission
@@ -131,6 +132,7 @@
     private final IRuleSetFactory ruleSetFactory;
     private final Set<String> configurableParameterNames;
     private final ExecutionPlans executionPlans;
+    private PlanInfo lastPlan;
 
     public APIFramework(ILangCompilationProvider compilationProvider) {
         this.rewriterFactory = compilationProvider.getRewriterFactory();
@@ -139,6 +141,22 @@
         this.ruleSetFactory = compilationProvider.getRuleSetFactory();
         this.configurableParameterNames = compilationProvider.getCompilerOptions();
         executionPlans = new ExecutionPlans();
+        lastPlan = null;
+    }
+
+    private class PlanInfo {
+        ILogicalPlan plan;
+        Map<Object, String> log2Phys;
+        boolean printOptimizerEstimates;
+        SessionConfig.PlanFormat format;
+
+        public PlanInfo(ILogicalPlan plan, Map<Object, String> log2Phys, boolean printOptimizerEstimates,
+                SessionConfig.PlanFormat format) {
+            this.plan = plan;
+            this.log2Phys = log2Phys;
+            this.printOptimizerEstimates = printOptimizerEstimates;
+            this.format = format;
+        }
     }
 
     private static class OptimizationContextFactory implements IOptimizationContextFactory {
@@ -328,6 +346,7 @@
             if (isQuery || isLoad || isCopy) {
                 generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                         cboMode);
+                lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
             }
         }
 
@@ -536,6 +555,20 @@
                 getPrettyPrintVisitor(format).printPlan(plan, log2phys, printOptimizerEstimates).toString());
     }
 
+    public void generateOptimizedLogicalPlanWithProfile(ObjectNode profile) throws HyracksDataException {
+        /*TODO(ian): we call this and overwrite the non-annotated plan, but there should be some way to skip initial
+                     plan printing if both profiling and plan printing are requested. */
+        try {
+            if (lastPlan != null) {
+                executionPlans.setOptimizedLogicalPlan(getPrettyPrintVisitor(lastPlan.format)
+                        .printPlan(lastPlan.plan, lastPlan.log2Phys, lastPlan.printOptimizerEstimates, profile)
+                        .toString());
+            }
+        } catch (AlgebricksException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     private void generateOptimizedLogicalPlan(ILogicalPlan plan, SessionConfig.PlanFormat format,
             boolean printOptimizerEstimates) throws AlgebricksException {
         executionPlans.setOptimizedLogicalPlan(
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 7cf7938..3456427 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4727,6 +4727,7 @@
         stats.setProcessedObjects(resultMetadata.getProcessedObjects());
         if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
             stats.setJobProfile(resultMetadata.getJobProfile());
+            apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
         }
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
index b056d2c..56169e7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
@@ -57,7 +57,7 @@
 
     @Parameters(name = "SqlppProfiledExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
-        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp_profiled.xml");
+        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
     }
 
     protected TestCaseContext tcCtx;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 0c4fae1..88107b0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -34,12 +34,12 @@
                     "counters": [
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}"
                         },
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}",
                             "pages-read": "R{[0-9.]+}",
                             "pages-read-cold": "R{[0-9.]+}",
@@ -50,7 +50,7 @@
                         },
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}",
                             "cardinality-out": "R{[0-9.]+}",
                             "avg-tuple-size": "R{[0-9.]+}",
@@ -67,7 +67,7 @@
                   "counters": [
                     {
                         "name": "R{.+}",
-                        "time": "R{[0-9.]+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
                         "cardinality-out": "R{[0-9.]+}",
                         "avg-tuple-size": "R{[0-9.]+}",
@@ -76,7 +76,7 @@
                     },
                     {
                         "name": "R{.+}",
-                        "time": "R{[0-9.]+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}"
                     }
                   ]
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 0d8891c..aba35f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -111,5 +111,9 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
index 2b53de4..9f138b5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
@@ -25,6 +25,8 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * Note: Some implementations may be stateful and not thread-safe.
  */
@@ -49,6 +51,10 @@
     IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates)
             throws AlgebricksException;
 
+    /** Prints the logical plan, annotated with physical operator and connector ids, and profiling info*/
+    IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates,
+            ObjectNode profile) throws AlgebricksException;
+
     /** Resets the state of the pretty printer. */
     IPlanPrettyPrinter reset() throws AlgebricksException;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index e7fc5f9..f49b6d4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -74,6 +74,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPrettyPrintVisitor<Integer>
         implements IPlanPrettyPrinter {
 
@@ -106,6 +108,14 @@
     }
 
     @Override
+    public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+            boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+        //TODO(ian): add times
+        printPlanImpl(plan, 0, printOptimizerEstimates);
+        return this;
+    }
+
+    @Override
     public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
             boolean printOptimizerEstimates) throws AlgebricksException {
         printOperatorImpl(op, 0, printInputs, printOptimizerEstimates);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 4360c67..6f65a9d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -80,12 +80,15 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.util.DefaultIndenter;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperatorPrettyPrintVisitor<Void>
         implements IPlanPrettyPrinter {
@@ -102,6 +105,7 @@
     private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
     private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
     private Map<Object, String> log2odid = Collections.emptyMap();
+    private Map<String, ProfileInfo> profile = Collections.emptyMap();
     private final IdCounter idCounter = new IdCounter();
     private final JsonGenerator jsonGenerator;
 
@@ -149,6 +153,50 @@
         }
     }
 
+    private class ProfileInfo {
+        Map<Integer, Pair<Double, Double>> activities;
+
+        ProfileInfo() {
+            activities = new HashMap<>();
+        }
+
+        void visit(int id, double time) {
+            Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+            if (times.getFirst() > time) {
+                times.setFirst(time);
+            }
+            if (times.getSecond() < time) {
+                times.setSecond(time);
+            }
+        }
+    }
+
+    private static ActivityId acIdFromName(String name) {
+        String[] parts = name.split(" - ");
+        return ActivityId.parse(parts[0]);
+    }
+
+    Map<String, ProfileInfo> processProfile(ObjectNode profile) {
+        Map<String, ProfileInfo> profiledOps = new HashMap<>();
+        for (JsonNode joblet : profile.get("joblets")) {
+            for (JsonNode task : joblet.get("tasks")) {
+                for (JsonNode counters : task.get("counters")) {
+                    ProfileInfo info =
+                            profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
+                    info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+                            counters.get("run-time").asDouble());
+                }
+                for (JsonNode partition : task.get("partition-send-profile")) {
+                    String id = partition.get("partition-id").get("connector-id").asText();
+                    ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+                    //CDIDs are unique
+                    info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+                }
+            }
+        }
+        return profiledOps;
+    }
+
     @Override
     public final IPlanPrettyPrinter reset() throws AlgebricksException {
         flushContentToWriter();
@@ -175,6 +223,16 @@
     }
 
     @Override
+    public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+            boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+        this.log2odid = log2phys;
+        this.profile = processProfile(profile);
+        printPlanImpl(plan, printOptimizerEstimates);
+        flushContentToWriter();
+        return this;
+    }
+
+    @Override
     public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
             boolean printOptimizerEstimates) throws AlgebricksException {
         printOperatorImpl(op, printInputs, printOptimizerEstimates);
@@ -208,6 +266,23 @@
             String od = log2odid.get(op);
             if (od != null) {
                 jsonGenerator.writeStringField("runtime-id", od);
+                ProfileInfo info = profile.get(od);
+                if (info != null) {
+                    if (info.activities.size() == 1) {
+                        jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
+                        jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+                    } else {
+                        jsonGenerator.writeObjectFieldStart("times");
+                        for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
+                            jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+                            jsonGenerator.writeNumberField("min-time", ac.getValue().first);
+                            jsonGenerator.writeNumberField("max-time", ac.getValue().second);
+                            jsonGenerator.writeEndObject();
+                        }
+                        jsonGenerator.writeEndObject();
+                    }
+
+                }
             }
             IPhysicalOperator pOp = op.getPhysicalOperator();
             if (pOp != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index 5b9495a..dc53bca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.FrameConstants;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -29,129 +30,96 @@
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.api.util.HyracksRunnable;
+import org.apache.hyracks.api.util.HyracksThrowingConsumer;
 import org.apache.hyracks.util.IntSerDeUtils;
 
-public class ProfiledFrameWriter implements IFrameWriter, IPassableTimer {
+public class ProfiledFrameWriter implements IFrameWriter {
 
     // The downstream data consumer of this writer.
     private final IFrameWriter writer;
-    private long frameStart = 0;
-    final ICounter timeCounter;
-    final ICounter tupleCounter;
-    final IStatsCollector collector;
-    final IOperatorStats stats;
-    final IOperatorStats parentStats;
+    private final ICounter tupleCounter;
+    private final IOperatorStats parentStats;
     private int minSz = Integer.MAX_VALUE;
     private int maxSz = -1;
     private long avgSz;
-    final String name;
+    private ICounter totalTime;
 
-    public ProfiledFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, IOperatorStats stats,
-            IOperatorStats parentStats) {
+    public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) {
         this.writer = writer;
-        this.collector = collector;
-        this.name = name;
-        this.stats = stats;
         this.parentStats = parentStats;
-        this.timeCounter = stats.getTimeCounter();
         this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
+        this.totalTime = new Counter("totalTime");
+    }
+
+    public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
+        long nt = 0;
+        try {
+            nt = System.nanoTime();
+            r.run();
+        } finally {
+            c.update(System.nanoTime() - nt);
+        }
+    }
+
+    private void timeMethod(HyracksThrowingConsumer<ByteBuffer> c, ByteBuffer buffer) throws HyracksDataException {
+        long nt = 0;
+        try {
+            nt = System.nanoTime();
+            c.accept(buffer);
+        } finally {
+            totalTime.update(System.nanoTime() - nt);
+        }
     }
 
     @Override
     public final void open() throws HyracksDataException {
-        try {
-            startClock();
-            writer.open();
-        } finally {
-            stopClock();
+        timeMethod(writer::open, totalTime);
+    }
+
+    private void updateTupleStats(ByteBuffer buffer) {
+        int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
+        int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
+        if (tupleCounter != null) {
+            long prevCount = tupleCounter.get();
+            for (int i = 0; i < tupleCount; i++) {
+                int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+                if (maxSz < tupleLen) {
+                    maxSz = tupleLen;
+                }
+                if (minSz > tupleLen) {
+                    minSz = tupleLen;
+                }
+                long prev = avgSz * prevCount;
+                avgSz = (prev + tupleLen) / (prevCount + 1);
+                prevCount++;
+            }
+            parentStats.getMaxTupleSz().set(maxSz);
+            parentStats.getMinTupleSz().set(minSz);
+            parentStats.getAverageTupleSz().set(avgSz);
+            tupleCounter.update(tupleCount);
         }
     }
 
     @Override
     public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
-            int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
-            if (tupleCounter != null) {
-                long prevCount = tupleCounter.get();
-                for (int i = 0; i < tupleCount; i++) {
-                    int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
-                    if (maxSz < tupleLen) {
-                        maxSz = tupleLen;
-                    }
-                    if (minSz > tupleLen) {
-                        minSz = tupleLen;
-                    }
-                    long prev = avgSz * prevCount;
-                    avgSz = (prev + tupleLen) / (prevCount + 1);
-                    prevCount++;
-                }
-                parentStats.getMaxTupleSz().set(maxSz);
-                parentStats.getMinTupleSz().set(minSz);
-                parentStats.getAverageTupleSz().set(avgSz);
-                tupleCounter.update(tupleCount);
-            }
-            startClock();
-            writer.nextFrame(buffer);
-        } finally {
-            stopClock();
-        }
+        updateTupleStats(buffer);
+        timeMethod(writer::nextFrame, buffer);
     }
 
     @Override
     public final void flush() throws HyracksDataException {
-        try {
-            startClock();
-            writer.flush();
-        } finally {
-            stopClock();
-        }
+        timeMethod(writer::flush, totalTime);
     }
 
     @Override
     public final void fail() throws HyracksDataException {
-        writer.fail();
+        timeMethod(writer::fail, totalTime);
     }
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            startClock();
-            writer.close();
-        } finally {
-            stopClock();
-        }
-    }
-
-    private void stopClock() {
-        pause();
-        collector.giveClock(this);
-    }
-
-    private void startClock() {
-        if (frameStart > 0) {
-            return;
-        }
-        frameStart = collector.takeClock(this);
-    }
-
-    @Override
-    public void resume() {
-        if (frameStart > 0) {
-            return;
-        }
-        long nt = System.nanoTime();
-        frameStart = nt;
-    }
-
-    @Override
-    public void pause() {
-        if (frameStart > 1) {
-            long nt = System.nanoTime();
-            long delta = nt - frameStart;
-            timeCounter.update(delta);
-            frameStart = -1;
-        }
+        timeMethod(writer::close, totalTime);
     }
 
     private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, ByteBuffer buffer) {
@@ -174,9 +142,14 @@
             IStatsCollector statsCollector = ctx.getStatsCollector();
             IOperatorStats stats = new OperatorStats(name);
             statsCollector.add(stats);
-            return new ProfiledFrameWriter(writer, ctx.getStatsCollector(), name, stats, null);
+            return new ProfiledFrameWriter(writer, null);
 
         } else
             return writer;
     }
+
+    public long getTotalTime() {
+        return totalTime.get();
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index f787a1c..bde5611 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -19,7 +19,9 @@
 package org.apache.hyracks.api.dataflow;
 
 import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,43 +29,45 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
 
-public class ProfiledOperatorNodePushable extends ProfiledFrameWriter implements IOperatorNodePushable, IPassableTimer {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
 
-    IOperatorNodePushable op;
-    ProfiledOperatorNodePushable parentOp;
-    ActivityId acId;
-    HashMap<Integer, IFrameWriter> inputs;
-    long frameStart;
+    private final IOperatorNodePushable op;
+    private final Map<Integer, ProfiledFrameWriter> inputs;
+    private final Map<Integer, ProfiledOperatorNodePushable> parents;
+    private final Map<Integer, ProfiledFrameWriter> outputs;
+    private final IOperatorStats stats;
+    private final ICounter totalTime;
 
-    ProfiledOperatorNodePushable(IOperatorNodePushable op, ActivityId acId, IStatsCollector collector,
-            IOperatorStats stats, ActivityId parent, ProfiledOperatorNodePushable parentOp)
-            throws HyracksDataException {
-        super(null, collector, acId.toString() + " - " + op.getDisplayName(), stats,
-                parentOp != null ? parentOp.getStats() : null);
-        this.parentOp = parentOp;
+    ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats,
+            ProfiledOperatorNodePushable parentOp) {
+        this.stats = stats;
+        this.parents = new HashMap<>();
+        parents.put(0, parentOp);
         this.op = op;
-        this.acId = acId;
         inputs = new HashMap<>();
+        outputs = new HashMap<>();
+        this.totalTime = new Counter("totalTime");
     }
 
     @Override
     public void initialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.initialize();
-            stopClock();
-        }
+        ProfiledFrameWriter.timeMethod(op::initialize, totalTime);
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.deinitialize();
-            stopClock();
+        long ownTime = totalTime.get();
+        for (ProfiledFrameWriter i : inputs.values()) {
+            ownTime += i.getTotalTime();
         }
+        for (ProfiledFrameWriter w : outputs.values()) {
+            ownTime -= w.getTotalTime();
+        }
+        op.deinitialize();
+        stats.getTimeCounter().set(ownTime);
     }
 
     @Override
@@ -74,17 +78,23 @@
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
             throws HyracksDataException {
+        if (writer instanceof ProfiledFrameWriter) {
+            ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+            outputs.put(index, wrapper);
+        }
         op.setOutputFrameWriter(index, writer, recordDesc);
     }
 
     @Override
     public IFrameWriter getInputFrameWriter(int index) {
-        IFrameWriter ifw = op.getInputFrameWriter(index);
-        if (!(op instanceof ProfiledFrameWriter) && ifw.equals(op)) {
-            return new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
-                    acId.toString() + "-" + op.getDisplayName(), stats, parentStats);
+        if (inputs.get(index) == null) {
+            IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
+            ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats);
+            inputs.put(index, pfw);
+            return pfw;
+        } else {
+            return inputs.get(index);
         }
-        return op.getInputFrameWriter(index);
     }
 
     @Override
@@ -92,45 +102,14 @@
         return op.getDisplayName();
     }
 
-    private void stopClock() {
-        pause();
-        collector.giveClock(this);
-    }
-
-    private void startClock() {
-        if (frameStart > 0) {
-            return;
-        }
-        frameStart = collector.takeClock(this);
-    }
-
-    @Override
-    public void resume() {
-        if (frameStart > 0) {
-            return;
-        }
-        long nt = System.nanoTime();
-        frameStart = nt;
-    }
-
-    @Override
-    public void pause() {
-        if (frameStart > 0) {
-            long nt = System.nanoTime();
-            long delta = nt - frameStart;
-            timeCounter.update(delta);
-            frameStart = -1;
-        }
+    public void addParent(int index, ProfiledOperatorNodePushable parent) {
+        parents.put(index, parent);
     }
 
     public IOperatorStats getStats() {
         return stats;
     }
 
-    public IOperatorStats getParentStats() {
-        return parentStats;
-    }
-
     public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
             ProfiledOperatorNodePushable source) throws HyracksDataException {
         String name = acId.toString() + " - " + op.getDisplayName();
@@ -141,7 +120,7 @@
             ((IIntrospectingOperator) op).setOperatorStats(stats);
         }
         if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
-            return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, acId, source);
+            return new ProfiledOperatorNodePushable(op, stats, source);
         }
         return op;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index d2ad20c..3169c81 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -21,7 +21,6 @@
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
@@ -54,17 +53,4 @@
      */
     IOperatorStats getAggregatedStats();
 
-    /**
-     * Pause an operator's timer, to pass it to another operator
-     * @param newHolder the timer that is starting execution
-     * @return the current nanoTime when the clock was taken from the other operator
-     */
-    long takeClock(IPassableTimer newHolder);
-
-    /**
-     * Resume an operator's timer, when a downstream operator has finished execution of
-     * the method the upstream operator called
-     * @param currHolder the timer that needs to be paused
-     */
-    void giveClock(IPassableTimer currHolder);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index efd4e07..de28318 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -158,6 +158,12 @@
                 }
                 operatorNodePushablesBFSOrder.add(destOp);
                 operatorNodePushables.put(destId, destOp);
+            } else if (profile) {
+                if (destOp instanceof ProfiledOperatorNodePushable
+                        && sourceOp instanceof ProfiledOperatorNodePushable) {
+                    ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
+                            (ProfiledOperatorNodePushable) sourceOp);
+                }
             }
 
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
index 6afbccb..a82c150 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
@@ -16,17 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
 
-public interface IPassableTimer {
-    /*
-    A timer intended to be used for timing the individual components of a
-    pipelined process. An instance of IPassableTimer is held by each method
-    in the pipeline, and is paused() when that method passes off control to
-    a component above it, and is resume()d when the component above it returns.
-     */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-    void pause();
-
-    void resume();
+@FunctionalInterface
+public interface HyracksRunnable {
+    void run() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 41beac0..76c8017 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,23 +21,19 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 
 public class StatsCollector implements IStatsCollector {
-    private static final long serialVersionUID = 6858817639895434572L;
+    private static final long serialVersionUID = 6858817639895434573L;
 
     private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
-    private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
 
     @Override
     public void add(IOperatorStats operatorStats) {
@@ -91,23 +87,4 @@
         }
     }
 
-    @Override
-    public long takeClock(IPassableTimer newHolder) {
-        if (newHolder != null) {
-            if (clockHolder.peek() != null) {
-                clockHolder.peek().pause();
-            }
-            clockHolder.push(newHolder);
-        }
-        return System.nanoTime();
-    }
-
-    @Override
-    public void giveClock(IPassableTimer currHolder) {
-        clockHolder.removeLastOccurrence(currHolder);
-        if (clockHolder.peek() != null) {
-            clockHolder.peek().resume();
-        }
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 4036f00..546360a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -128,7 +128,7 @@
         opTimes.forEach((key, value) -> {
             ObjectNode jpe = om.createObjectNode();
             jpe.put("name", key);
-            jpe.put("time", Double
+            jpe.put("run-time", Double
                     .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
             if (value.getId().getId() >= 0) {
                 jpe.put("runtime-id", value.getId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 815536b..6e58e9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,14 +30,12 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ProfiledRunGenerator;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
@@ -143,13 +141,11 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
                         recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
                         groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
                         partialAggRecordDesc, ALG);
-                return profile ? ProfiledRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)", this.getActivityId())
-                        : runGen;
+                return runGen;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 67b0686..dc2b46f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -224,6 +224,11 @@
         }
 
         @Override
+        public String getDisplayName() {
+            return "Intersect";
+        }
+
+        @Override
         public IFrameWriter getInputFrameWriter(final int index) {
             return new IFrameWriter() {
                 private final int[] normalizedKey1 =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ebe871b..38320ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 
@@ -79,11 +78,9 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
                         comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
-                return profile ? ProfiledRunGenerator.time(runGen, ctx, "ExternalSort(Sort)", this.getActivityId())
-                        : runGen;
+                return runGen;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
deleted file mode 100644
index 5cc1882..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.api.job.profiling.OperatorStats;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-
-public class ProfiledRunGenerator extends ProfiledFrameWriter implements IRunGenerator {
-
-    private final IRunGenerator runGenerator;
-
-    private ProfiledRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name,
-            IOperatorStats stats, ActivityId root) {
-        super(runGenerator, collector, name, stats, null);
-        this.runGenerator = runGenerator;
-    }
-
-    @Override
-    public List<GeneratedRunFileReader> getRuns() {
-        return runGenerator.getRuns();
-    }
-
-    @Override
-    public ISorter getSorter() {
-        return runGenerator.getSorter();
-    }
-
-    public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name, ActivityId root)
-            throws HyracksDataException {
-        if (!(runGenerator instanceof ProfiledRunGenerator)) {
-            String statName = root.toString() + " - " + name;
-            IStatsCollector statsCollector = ctx.getStatsCollector();
-            IOperatorStats stats = new OperatorStats(statName);
-            statsCollector.add(stats);
-            return new ProfiledRunGenerator(runGenerator, ctx.getStatsCollector(), name, stats, root);
-        }
-        return runGenerator;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 2322910..934ae60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -29,9 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 
 public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -63,16 +61,9 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
                         keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
-                try {
-                    return profile ? ProfiledRunGenerator.time(runGen, ctx, "TopKSort (Sort)", this.getActivityId())
-                            : runGen;
-                } catch (HyracksDataException e) {
-                    e.printStackTrace();
-                }
-                return null;
+                return runGen;
             }
         };
     }