Merge "Merge branch 'trinity' into 'master'"
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index bb93fbc..9d94327 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -114,6 +114,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Provides helper methods for compilation of a query into a JobSpec and submission
@@ -131,6 +132,7 @@
private final IRuleSetFactory ruleSetFactory;
private final Set<String> configurableParameterNames;
private final ExecutionPlans executionPlans;
+ private PlanInfo lastPlan;
public APIFramework(ILangCompilationProvider compilationProvider) {
this.rewriterFactory = compilationProvider.getRewriterFactory();
@@ -139,6 +141,22 @@
this.ruleSetFactory = compilationProvider.getRuleSetFactory();
this.configurableParameterNames = compilationProvider.getCompilerOptions();
executionPlans = new ExecutionPlans();
+ lastPlan = null;
+ }
+
+ private class PlanInfo {
+ ILogicalPlan plan;
+ Map<Object, String> log2Phys;
+ boolean printOptimizerEstimates;
+ SessionConfig.PlanFormat format;
+
+ public PlanInfo(ILogicalPlan plan, Map<Object, String> log2Phys, boolean printOptimizerEstimates,
+ SessionConfig.PlanFormat format) {
+ this.plan = plan;
+ this.log2Phys = log2Phys;
+ this.printOptimizerEstimates = printOptimizerEstimates;
+ this.format = format;
+ }
}
private static class OptimizationContextFactory implements IOptimizationContextFactory {
@@ -328,6 +346,7 @@
if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
cboMode);
+ lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
}
}
@@ -536,6 +555,20 @@
getPrettyPrintVisitor(format).printPlan(plan, log2phys, printOptimizerEstimates).toString());
}
+ public void generateOptimizedLogicalPlanWithProfile(ObjectNode profile) throws HyracksDataException {
+ /*TODO(ian): we call this and overwrite the non-annotated plan, but there should be some way to skip initial
+ plan printing if both profiling and plan printing are requested. */
+ try {
+ if (lastPlan != null) {
+ executionPlans.setOptimizedLogicalPlan(getPrettyPrintVisitor(lastPlan.format)
+ .printPlan(lastPlan.plan, lastPlan.log2Phys, lastPlan.printOptimizerEstimates, profile)
+ .toString());
+ }
+ } catch (AlgebricksException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
private void generateOptimizedLogicalPlan(ILogicalPlan plan, SessionConfig.PlanFormat format,
boolean printOptimizerEstimates) throws AlgebricksException {
executionPlans.setOptimizedLogicalPlan(
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 7cf7938..3456427 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4727,6 +4727,7 @@
stats.setProcessedObjects(resultMetadata.getProcessedObjects());
if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
stats.setJobProfile(resultMetadata.getJobProfile());
+ apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
}
stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
index b056d2c..56169e7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
@@ -57,7 +57,7 @@
@Parameters(name = "SqlppProfiledExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
- return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp_profiled.xml");
+ return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
}
protected TestCaseContext tcCtx;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 0c4fae1..88107b0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -34,12 +34,12 @@
"counters": [
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"pages-read": "R{[0-9.]+}",
"pages-read-cold": "R{[0-9.]+}",
@@ -50,7 +50,7 @@
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -67,7 +67,7 @@
"counters": [
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -76,7 +76,7 @@
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
}
]
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 0d8891c..aba35f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -111,5 +111,9 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
index 2b53de4..9f138b5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
@@ -25,6 +25,8 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
/**
* Note: Some implementations may be stateful and not thread-safe.
*/
@@ -49,6 +51,10 @@
IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates)
throws AlgebricksException;
+ /** Prints the logical plan, annotated with physical operator and connector ids, and profiling info*/
+ IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates,
+ ObjectNode profile) throws AlgebricksException;
+
/** Resets the state of the pretty printer. */
IPlanPrettyPrinter reset() throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index e7fc5f9..f49b6d4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -74,6 +74,8 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPrettyPrintVisitor<Integer>
implements IPlanPrettyPrinter {
@@ -106,6 +108,14 @@
}
@Override
+ public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+ boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+ //TODO(ian): add times
+ printPlanImpl(plan, 0, printOptimizerEstimates);
+ return this;
+ }
+
+ @Override
public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
boolean printOptimizerEstimates) throws AlgebricksException {
printOperatorImpl(op, 0, printInputs, printOptimizerEstimates);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 4360c67..6f65a9d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -80,12 +80,15 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperatorPrettyPrintVisitor<Void>
implements IPlanPrettyPrinter {
@@ -102,6 +105,7 @@
private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
private Map<Object, String> log2odid = Collections.emptyMap();
+ private Map<String, ProfileInfo> profile = Collections.emptyMap();
private final IdCounter idCounter = new IdCounter();
private final JsonGenerator jsonGenerator;
@@ -149,6 +153,50 @@
}
}
+ private class ProfileInfo {
+ Map<Integer, Pair<Double, Double>> activities;
+
+ ProfileInfo() {
+ activities = new HashMap<>();
+ }
+
+ void visit(int id, double time) {
+ Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+ if (times.getFirst() > time) {
+ times.setFirst(time);
+ }
+ if (times.getSecond() < time) {
+ times.setSecond(time);
+ }
+ }
+ }
+
+ private static ActivityId acIdFromName(String name) {
+ String[] parts = name.split(" - ");
+ return ActivityId.parse(parts[0]);
+ }
+
+ Map<String, ProfileInfo> processProfile(ObjectNode profile) {
+ Map<String, ProfileInfo> profiledOps = new HashMap<>();
+ for (JsonNode joblet : profile.get("joblets")) {
+ for (JsonNode task : joblet.get("tasks")) {
+ for (JsonNode counters : task.get("counters")) {
+ ProfileInfo info =
+ profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
+ info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+ counters.get("run-time").asDouble());
+ }
+ for (JsonNode partition : task.get("partition-send-profile")) {
+ String id = partition.get("partition-id").get("connector-id").asText();
+ ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+ //CDIDs are unique
+ info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+ }
+ }
+ }
+ return profiledOps;
+ }
+
@Override
public final IPlanPrettyPrinter reset() throws AlgebricksException {
flushContentToWriter();
@@ -175,6 +223,16 @@
}
@Override
+ public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+ boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+ this.log2odid = log2phys;
+ this.profile = processProfile(profile);
+ printPlanImpl(plan, printOptimizerEstimates);
+ flushContentToWriter();
+ return this;
+ }
+
+ @Override
public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
boolean printOptimizerEstimates) throws AlgebricksException {
printOperatorImpl(op, printInputs, printOptimizerEstimates);
@@ -208,6 +266,23 @@
String od = log2odid.get(op);
if (od != null) {
jsonGenerator.writeStringField("runtime-id", od);
+ ProfileInfo info = profile.get(od);
+ if (info != null) {
+ if (info.activities.size() == 1) {
+ jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
+ jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+ } else {
+ jsonGenerator.writeObjectFieldStart("times");
+ for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
+ jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+ jsonGenerator.writeNumberField("min-time", ac.getValue().first);
+ jsonGenerator.writeNumberField("max-time", ac.getValue().second);
+ jsonGenerator.writeEndObject();
+ }
+ jsonGenerator.writeEndObject();
+ }
+
+ }
}
IPhysicalOperator pOp = op.getPhysicalOperator();
if (pOp != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index 5b9495a..dc53bca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.FrameConstants;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -29,129 +30,96 @@
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.api.util.HyracksRunnable;
+import org.apache.hyracks.api.util.HyracksThrowingConsumer;
import org.apache.hyracks.util.IntSerDeUtils;
-public class ProfiledFrameWriter implements IFrameWriter, IPassableTimer {
+public class ProfiledFrameWriter implements IFrameWriter {
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- private long frameStart = 0;
- final ICounter timeCounter;
- final ICounter tupleCounter;
- final IStatsCollector collector;
- final IOperatorStats stats;
- final IOperatorStats parentStats;
+ private final ICounter tupleCounter;
+ private final IOperatorStats parentStats;
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
- final String name;
+ private ICounter totalTime;
- public ProfiledFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, IOperatorStats stats,
- IOperatorStats parentStats) {
+ public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) {
this.writer = writer;
- this.collector = collector;
- this.name = name;
- this.stats = stats;
this.parentStats = parentStats;
- this.timeCounter = stats.getTimeCounter();
this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
+ this.totalTime = new Counter("totalTime");
+ }
+
+ public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
+ long nt = 0;
+ try {
+ nt = System.nanoTime();
+ r.run();
+ } finally {
+ c.update(System.nanoTime() - nt);
+ }
+ }
+
+ private void timeMethod(HyracksThrowingConsumer<ByteBuffer> c, ByteBuffer buffer) throws HyracksDataException {
+ long nt = 0;
+ try {
+ nt = System.nanoTime();
+ c.accept(buffer);
+ } finally {
+ totalTime.update(System.nanoTime() - nt);
+ }
}
@Override
public final void open() throws HyracksDataException {
- try {
- startClock();
- writer.open();
- } finally {
- stopClock();
+ timeMethod(writer::open, totalTime);
+ }
+
+ private void updateTupleStats(ByteBuffer buffer) {
+ int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
+ int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
+ if (tupleCounter != null) {
+ long prevCount = tupleCounter.get();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+ if (maxSz < tupleLen) {
+ maxSz = tupleLen;
+ }
+ if (minSz > tupleLen) {
+ minSz = tupleLen;
+ }
+ long prev = avgSz * prevCount;
+ avgSz = (prev + tupleLen) / (prevCount + 1);
+ prevCount++;
+ }
+ parentStats.getMaxTupleSz().set(maxSz);
+ parentStats.getMinTupleSz().set(minSz);
+ parentStats.getAverageTupleSz().set(avgSz);
+ tupleCounter.update(tupleCount);
}
}
@Override
public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
- int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
- if (tupleCounter != null) {
- long prevCount = tupleCounter.get();
- for (int i = 0; i < tupleCount; i++) {
- int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
- if (maxSz < tupleLen) {
- maxSz = tupleLen;
- }
- if (minSz > tupleLen) {
- minSz = tupleLen;
- }
- long prev = avgSz * prevCount;
- avgSz = (prev + tupleLen) / (prevCount + 1);
- prevCount++;
- }
- parentStats.getMaxTupleSz().set(maxSz);
- parentStats.getMinTupleSz().set(minSz);
- parentStats.getAverageTupleSz().set(avgSz);
- tupleCounter.update(tupleCount);
- }
- startClock();
- writer.nextFrame(buffer);
- } finally {
- stopClock();
- }
+ updateTupleStats(buffer);
+ timeMethod(writer::nextFrame, buffer);
}
@Override
public final void flush() throws HyracksDataException {
- try {
- startClock();
- writer.flush();
- } finally {
- stopClock();
- }
+ timeMethod(writer::flush, totalTime);
}
@Override
public final void fail() throws HyracksDataException {
- writer.fail();
+ timeMethod(writer::fail, totalTime);
}
@Override
public void close() throws HyracksDataException {
- try {
- startClock();
- writer.close();
- } finally {
- stopClock();
- }
- }
-
- private void stopClock() {
- pause();
- collector.giveClock(this);
- }
-
- private void startClock() {
- if (frameStart > 0) {
- return;
- }
- frameStart = collector.takeClock(this);
- }
-
- @Override
- public void resume() {
- if (frameStart > 0) {
- return;
- }
- long nt = System.nanoTime();
- frameStart = nt;
- }
-
- @Override
- public void pause() {
- if (frameStart > 1) {
- long nt = System.nanoTime();
- long delta = nt - frameStart;
- timeCounter.update(delta);
- frameStart = -1;
- }
+ timeMethod(writer::close, totalTime);
}
private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, ByteBuffer buffer) {
@@ -174,9 +142,14 @@
IStatsCollector statsCollector = ctx.getStatsCollector();
IOperatorStats stats = new OperatorStats(name);
statsCollector.add(stats);
- return new ProfiledFrameWriter(writer, ctx.getStatsCollector(), name, stats, null);
+ return new ProfiledFrameWriter(writer, null);
} else
return writer;
}
+
+ public long getTotalTime() {
+ return totalTime.get();
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index f787a1c..bde5611 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -19,7 +19,9 @@
package org.apache.hyracks.api.dataflow;
import java.util.HashMap;
+import java.util.Map;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,43 +29,45 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
-public class ProfiledOperatorNodePushable extends ProfiledFrameWriter implements IOperatorNodePushable, IPassableTimer {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
- IOperatorNodePushable op;
- ProfiledOperatorNodePushable parentOp;
- ActivityId acId;
- HashMap<Integer, IFrameWriter> inputs;
- long frameStart;
+ private final IOperatorNodePushable op;
+ private final Map<Integer, ProfiledFrameWriter> inputs;
+ private final Map<Integer, ProfiledOperatorNodePushable> parents;
+ private final Map<Integer, ProfiledFrameWriter> outputs;
+ private final IOperatorStats stats;
+ private final ICounter totalTime;
- ProfiledOperatorNodePushable(IOperatorNodePushable op, ActivityId acId, IStatsCollector collector,
- IOperatorStats stats, ActivityId parent, ProfiledOperatorNodePushable parentOp)
- throws HyracksDataException {
- super(null, collector, acId.toString() + " - " + op.getDisplayName(), stats,
- parentOp != null ? parentOp.getStats() : null);
- this.parentOp = parentOp;
+ ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats,
+ ProfiledOperatorNodePushable parentOp) {
+ this.stats = stats;
+ this.parents = new HashMap<>();
+ parents.put(0, parentOp);
this.op = op;
- this.acId = acId;
inputs = new HashMap<>();
+ outputs = new HashMap<>();
+ this.totalTime = new Counter("totalTime");
}
@Override
public void initialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.initialize();
- stopClock();
- }
+ ProfiledFrameWriter.timeMethod(op::initialize, totalTime);
}
@Override
public void deinitialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.deinitialize();
- stopClock();
+ long ownTime = totalTime.get();
+ for (ProfiledFrameWriter i : inputs.values()) {
+ ownTime += i.getTotalTime();
}
+ for (ProfiledFrameWriter w : outputs.values()) {
+ ownTime -= w.getTotalTime();
+ }
+ op.deinitialize();
+ stats.getTimeCounter().set(ownTime);
}
@Override
@@ -74,17 +78,23 @@
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
throws HyracksDataException {
+ if (writer instanceof ProfiledFrameWriter) {
+ ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+ outputs.put(index, wrapper);
+ }
op.setOutputFrameWriter(index, writer, recordDesc);
}
@Override
public IFrameWriter getInputFrameWriter(int index) {
- IFrameWriter ifw = op.getInputFrameWriter(index);
- if (!(op instanceof ProfiledFrameWriter) && ifw.equals(op)) {
- return new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
- acId.toString() + "-" + op.getDisplayName(), stats, parentStats);
+ if (inputs.get(index) == null) {
+ IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
+ ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats);
+ inputs.put(index, pfw);
+ return pfw;
+ } else {
+ return inputs.get(index);
}
- return op.getInputFrameWriter(index);
}
@Override
@@ -92,45 +102,14 @@
return op.getDisplayName();
}
- private void stopClock() {
- pause();
- collector.giveClock(this);
- }
-
- private void startClock() {
- if (frameStart > 0) {
- return;
- }
- frameStart = collector.takeClock(this);
- }
-
- @Override
- public void resume() {
- if (frameStart > 0) {
- return;
- }
- long nt = System.nanoTime();
- frameStart = nt;
- }
-
- @Override
- public void pause() {
- if (frameStart > 0) {
- long nt = System.nanoTime();
- long delta = nt - frameStart;
- timeCounter.update(delta);
- frameStart = -1;
- }
+ public void addParent(int index, ProfiledOperatorNodePushable parent) {
+ parents.put(index, parent);
}
public IOperatorStats getStats() {
return stats;
}
- public IOperatorStats getParentStats() {
- return parentStats;
- }
-
public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
ProfiledOperatorNodePushable source) throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
@@ -141,7 +120,7 @@
((IIntrospectingOperator) op).setOperatorStats(stats);
}
if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
- return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, acId, source);
+ return new ProfiledOperatorNodePushable(op, stats, source);
}
return op;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index d2ad20c..3169c81 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
@@ -54,17 +53,4 @@
*/
IOperatorStats getAggregatedStats();
- /**
- * Pause an operator's timer, to pass it to another operator
- * @param newHolder the timer that is starting execution
- * @return the current nanoTime when the clock was taken from the other operator
- */
- long takeClock(IPassableTimer newHolder);
-
- /**
- * Resume an operator's timer, when a downstream operator has finished execution of
- * the method the upstream operator called
- * @param currHolder the timer that needs to be paused
- */
- void giveClock(IPassableTimer currHolder);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index efd4e07..de28318 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -158,6 +158,12 @@
}
operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
+ } else if (profile) {
+ if (destOp instanceof ProfiledOperatorNodePushable
+ && sourceOp instanceof ProfiledOperatorNodePushable) {
+ ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
+ (ProfiledOperatorNodePushable) sourceOp);
+ }
}
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
index 6afbccb..a82c150 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
@@ -16,17 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
-public interface IPassableTimer {
- /*
- A timer intended to be used for timing the individual components of a
- pipelined process. An instance of IPassableTimer is held by each method
- in the pipeline, and is paused() when that method passes off control to
- a component above it, and is resume()d when the component above it returns.
- */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- void pause();
-
- void resume();
+@FunctionalInterface
+public interface HyracksRunnable {
+ void run() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 41beac0..76c8017 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,23 +21,19 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayDeque;
import java.util.Collections;
-import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
public class StatsCollector implements IStatsCollector {
- private static final long serialVersionUID = 6858817639895434572L;
+ private static final long serialVersionUID = 6858817639895434573L;
private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
- private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
@Override
public void add(IOperatorStats operatorStats) {
@@ -91,23 +87,4 @@
}
}
- @Override
- public long takeClock(IPassableTimer newHolder) {
- if (newHolder != null) {
- if (clockHolder.peek() != null) {
- clockHolder.peek().pause();
- }
- clockHolder.push(newHolder);
- }
- return System.nanoTime();
- }
-
- @Override
- public void giveClock(IPassableTimer currHolder) {
- clockHolder.removeLastOccurrence(currHolder);
- if (clockHolder.peek() != null) {
- clockHolder.peek().resume();
- }
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 4036f00..546360a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -128,7 +128,7 @@
opTimes.forEach((key, value) -> {
ObjectNode jpe = om.createObjectNode();
jpe.put("name", key);
- jpe.put("time", Double
+ jpe.put("run-time", Double
.parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
if (value.getId().getId() >= 0) {
jpe.put("runtime-id", value.getId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 815536b..6e58e9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,14 +30,12 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ProfiledRunGenerator;
/**
* This Operator pushes group-by aggregation into the external sort.
@@ -143,13 +141,11 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
partialAggRecordDesc, ALG);
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)", this.getActivityId())
- : runGen;
+ return runGen;
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 67b0686..dc2b46f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -224,6 +224,11 @@
}
@Override
+ public String getDisplayName() {
+ return "Intersect";
+ }
+
+ @Override
public IFrameWriter getInputFrameWriter(final int index) {
return new IFrameWriter() {
private final int[] normalizedKey1 =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ebe871b..38320ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,7 +30,6 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
@@ -79,11 +78,9 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "ExternalSort(Sort)", this.getActivityId())
- : runGen;
+ return runGen;
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
deleted file mode 100644
index 5cc1882..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.api.job.profiling.OperatorStats;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-
-public class ProfiledRunGenerator extends ProfiledFrameWriter implements IRunGenerator {
-
- private final IRunGenerator runGenerator;
-
- private ProfiledRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name,
- IOperatorStats stats, ActivityId root) {
- super(runGenerator, collector, name, stats, null);
- this.runGenerator = runGenerator;
- }
-
- @Override
- public List<GeneratedRunFileReader> getRuns() {
- return runGenerator.getRuns();
- }
-
- @Override
- public ISorter getSorter() {
- return runGenerator.getSorter();
- }
-
- public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name, ActivityId root)
- throws HyracksDataException {
- if (!(runGenerator instanceof ProfiledRunGenerator)) {
- String statName = root.toString() + " - " + name;
- IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(statName);
- statsCollector.add(stats);
- return new ProfiledRunGenerator(runGenerator, ctx.getStatsCollector(), name, stats, root);
- }
- return runGenerator;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 2322910..934ae60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -29,9 +29,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -63,16 +61,9 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
- try {
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "TopKSort (Sort)", this.getActivityId())
- : runGen;
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
- return null;
+ return runGen;
}
};
}