[NEMO-324] Distinguish Beam's run and waitUntilFinish methods (#187)
JIRA: [NEMO-324: Distinguish Beam's run and waitUntilFinish methods](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-324)
**Major changes:**
- Gets rid of the UnsupportedOperation, makes `waitUntilFinish` method to a meaningful one.
- Runs the application asynchronously on the `run` method.
**Minor changes to note:**
- Add the examples output directory to the gitignore.
**Tests for the changes:**
- I've added a same word count program that has a timeout of 1 second, and a test to confirm that it aborts after 1 second.
- Existing tests cover other changes.
**Other comments:**
- None
Closes #187
diff --git a/.gitignore b/.gitignore
index 069c859..1795a60 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,6 +50,7 @@
# ----------------------------------------------------------------------
# Temporary Files
# ----------------------------------------------------------------------
+outputs/*
tmp
*~
\#*
diff --git a/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java b/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
index 257d23a..750b210 100644
--- a/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
+++ b/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
@@ -109,20 +109,7 @@
* @return {@code true} if the manager set.
*/
private boolean waitUntilConnected() {
- connectionLock.lock();
- try {
- if (driverEndpoint.get() == null) {
- // If the driver endpoint is not connected, wait.
- driverConnected.await();
- }
- return true;
- } catch (final InterruptedException e) {
- e.printStackTrace(System.err);
- Thread.currentThread().interrupt();
- return false;
- } finally {
- connectionLock.unlock();
- }
+ return waitUntilConnected(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
@@ -153,19 +140,16 @@
} else {
// The driver endpoint is not connected yet.
final long currentNano = System.nanoTime();
- final boolean driverIsConnected;
- if (DEFAULT_DRIVER_WAIT_IN_MILLIS < unit.toMillis(timeout)) {
- driverIsConnected = waitUntilConnected(DEFAULT_DRIVER_WAIT_IN_MILLIS, TimeUnit.MILLISECONDS);
- } else {
- driverIsConnected = waitUntilConnected(timeout, unit);
- }
+ final boolean driverIsConnected =
+ waitUntilConnected(Math.min(DEFAULT_DRIVER_WAIT_IN_MILLIS, unit.toMillis(timeout)), TimeUnit.MILLISECONDS);
if (driverIsConnected) {
final long consumedTime = System.nanoTime() - currentNano;
return stateTranslator.translateState(driverEndpoint.get().
waitUntilFinish(timeout - unit.convert(consumedTime, TimeUnit.NANOSECONDS), unit));
} else {
- return PlanState.State.READY;
+ // Driver is not connected.
+ return stateTranslator.translateState(PlanState.State.READY);
}
}
}
@@ -176,17 +160,6 @@
* @return the final state of this job.
*/
public final Enum waitUntilJobFinish() {
- if (driverEndpoint.get() != null) {
- return stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
- } else {
- // The driver endpoint is not connected yet.
- final boolean driverIsConnected = waitUntilConnected();
-
- if (driverIsConnected) {
- return stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
- } else {
- return PlanState.State.READY;
- }
- }
+ return waitUntilJobFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
diff --git a/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java b/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
index 6deb8e6..35edea7 100644
--- a/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
+++ b/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
@@ -83,6 +83,6 @@
* @return the final state of this plan.
*/
PlanState.State waitUntilFinish() {
- return planStateManager.waitUntilFinish();
+ return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 9873f16..acd9a22 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -86,6 +86,7 @@
private static DriverLauncher driverLauncher;
private static DriverRPCServer driverRPCServer;
+ private static boolean isSetUp = false;
private static CountDownLatch driverReadyLatch;
private static CountDownLatch jobDoneLatch;
private static String serializedDAG;
@@ -170,6 +171,8 @@
// Launch driver
LOG.info("Launching driver");
driverReadyLatch = new CountDownLatch(1);
+ jobDoneLatch = new CountDownLatch(1);
+ isSetUp = true;
driverLauncher = DriverLauncher.getLauncher(deployModeConf);
driverLauncher.submit(jobAndDriverConf, 500);
// When the driver is up and the resource is ready, the DriverReady message is delivered.
@@ -201,9 +204,12 @@
// Close everything that's left
driverRPCServer.shutdown();
driverLauncher.close();
+ isSetUp = false;
final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
if (possibleError.isPresent()) {
throw new RuntimeException(possibleError.get());
+ } else if (jobDoneLatch.getCount() > 0) {
+ LOG.info("Job cancelled");
} else {
LOG.info("Job successfully completed");
}
@@ -261,7 +267,7 @@
final Map<Serializable, Object> broadcastVariables,
final String jobId) {
// launch driver if it hasn't been already
- if (driverReadyLatch == null) {
+ if (!isSetUp) {
try {
setup(new String[]{"-job_id", jobId});
} catch (Exception e) {
@@ -281,7 +287,9 @@
LOG.info("Launching DAG...");
serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
- jobDoneLatch = new CountDownLatch(1);
+ if (jobDoneLatch.getCount() == 0) { // when this is not the first execution.
+ jobDoneLatch = new CountDownLatch(1);
+ }
driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
.setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
index d983521..d360699 100644
--- a/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
@@ -21,20 +21,35 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.nemo.client.ClientEndpoint;
+import org.apache.nemo.client.JobLauncher;
import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Beam result.
*/
public final class NemoPipelineResult extends ClientEndpoint implements PipelineResult {
+ private static final Logger LOG = LoggerFactory.getLogger(NemoPipelineResult.class.getName());
+ private final CountDownLatch jobDone;
/**
* Default constructor.
*/
public NemoPipelineResult() {
super(new BeamStateTranslator());
+ this.jobDone = new CountDownLatch(1);
+ }
+
+ /**
+ * Signal that the job is finished to the NemoPipelineResult object.
+ */
+ public void setJobDone() {
+ this.jobDone.countDown();
}
@Override
@@ -44,14 +59,35 @@
@Override
public State cancel() throws IOException {
- throw new UnsupportedOperationException("cancel() in frontend.beam.NemoPipelineResult");
+ try {
+ JobLauncher.shutdown();
+ return State.CANCELLED;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public State waitUntilFinish(final Duration duration) {
- throw new UnsupportedOperationException();
- // TODO #208: NemoPipelineResult#waitUntilFinish hangs
- // Previous code that hangs the job:
+ try {
+ if (duration.getMillis() < 1) {
+ this.jobDone.await();
+ return State.DONE;
+ } else {
+ final boolean finished = this.jobDone.await(duration.getMillis(), TimeUnit.MILLISECONDS);
+ if (finished) {
+ LOG.info("Job successfully finished before timeout of {}ms, while waiting until finish",
+ duration.getMillis());
+ return State.DONE;
+ } else {
+ LOG.warn("Job timed out before {}ms, while waiting until finish. Call 'cancel' to cancel the job.",
+ duration.getMillis());
+ return State.RUNNING;
+ }
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
// return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
}
diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
index 81aa18d..bf3323d 100644
--- a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
@@ -27,6 +27,8 @@
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.apache.nemo.compiler.frontend.beam.PipelineVisitor;
+import java.util.concurrent.CompletableFuture;
+
/**
* Runner class for BEAM programs.
*/
@@ -84,7 +86,9 @@
final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
pipeline.traverseTopologically(pipelineVisitor);
final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
- JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName());
+ CompletableFuture.runAsync(() ->
+ JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName()))
+ .thenRun(nemoPipelineResult::setJobDone);
return nemoPipelineResult;
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java b/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java
new file mode 100644
index 0000000..70e6d5c
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.common.exception;
+
+/**
+ * OutputMismatchException.
+ * Thrown in ITCases where output doesn't match the expected outputs.
+ */
+public class OutputMismatchException extends RuntimeException {
+ /**
+ * Constructor of OutputMismatchException.
+ *
+ * @param cause cause.
+ */
+ public OutputMismatchException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructor of OutputMismatchException.
+ *
+ * @param message message.
+ */
+ public OutputMismatchException(final String message) {
+ super(message);
+ }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java b/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
index 08691af..6a41449 100644
--- a/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
+++ b/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
@@ -18,6 +18,8 @@
*/
package org.apache.nemo.common.test;
+import org.apache.nemo.common.exception.OutputMismatchException;
+
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -63,7 +65,7 @@
try {
return Files.lines(path);
} catch (final IOException e) {
- throw new RuntimeException(e);
+ throw new OutputMismatchException(e);
}
})
.sorted()
@@ -87,7 +89,7 @@
+ "\n=============" + testResourceFileName + "=================="
+ resourceOutput
+ "\n===============================";
- throw new RuntimeException(outputMsg);
+ throw new OutputMismatchException(outputMsg);
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 5b878d9..d87d546 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -423,7 +423,7 @@
GenericSourceSink.write(result, outputFilePath);
}
- p.run();
+ p.run().waitUntilFinish();
LOG.info("JCT " + (System.currentTimeMillis() - start));
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index 3aa95c4..fd9f994 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -159,7 +159,7 @@
itemMatrix = itemMatrix.apply(new UpdateUserAndItemMatrix(numFeatures, lambda, rawData, parsedItemData));
}
- p.run();
+ p.run().waitUntilFinish();
LOG.info("JCT " + (System.currentTimeMillis() - start));
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index ad7f8e2..46d67fc 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -70,6 +70,6 @@
);
GenericSourceSink.write(result, outputFilePath);
- p.run();
+ p.run().waitUntilFinish();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
index 4ac1085..68e0777 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -82,6 +82,6 @@
//
// By default, it will write to a set of files with names like wordcounts-00001-of-00005
.apply(TextIO.write().to(outputFilePath));
- p.run();
+ p.run().waitUntilFinish();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 3efbd72..b46ab77 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -459,7 +459,7 @@
model = model.apply(new UpdateModel(numFeatures, numClasses, i, readInput));
}
- p.run();
+ p.run().waitUntilFinish();
LOG.info("JCT " + (System.currentTimeMillis() - start));
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index aa56246..c652f46 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -121,7 +121,7 @@
}
}));
GenericSourceSink.write(result, outputFilePath);
- p.run();
+ p.run().waitUntilFinish();
}
/**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index 7845b18..48b2ce9 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -95,7 +95,7 @@
GenericSourceSink.write(longWords, outputFilePath + "_long");
GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long");
GenericSourceSink.write(veryVeryLongWords, outputFilePath + "_very_very_long");
- p.run();
+ p.run().waitUntilFinish();
}
/**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index 5161bcd..3461b23 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -80,7 +80,7 @@
}
}));
GenericSourceSink.write(result, outputFilePath);
- p.run();
+ p.run().waitUntilFinish();
LOG.info("*******END*******");
LOG.info("JCT(ms): " + (System.currentTimeMillis() - start));
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index c6fb382..1973970 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -93,7 +93,7 @@
GenericSourceSink.write(results[i], outputFilePath + "_" + i);
}
- p.run();
+ p.run().waitUntilFinish();
}
/**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 8be20f3..4fd3e80 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -88,6 +88,6 @@
}
})), outputFilePath);
- p.run();
+ p.run().waitUntilFinish();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index 36ce9d1..890a629 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -89,6 +89,6 @@
}).withSideInputs(windowedView)
).apply(new WriteOneFilePerWindow(outputFilePath, 1));
- p.run();
+ p.run().waitUntilFinish();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 453c4d2..490f300 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -125,6 +125,6 @@
}))
.apply(new WriteOneFilePerWindow(outputFilePath, 1));
- p.run();
+ p.run().waitUntilFinish();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index dcc5485..367938f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -47,6 +47,19 @@
final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WordCount");
+ final Pipeline p = generateWordCountPipeline(options, inputFilePath, outputFilePath);
+ p.run().waitUntilFinish();
+ }
+
+ /**
+ * Static method to generate the word count Beam pipeline.
+ * @param options options for the pipeline.
+ * @param inputFilePath the input file path.
+ * @param outputFilePath the output file path.
+ * @return the generated pipeline.
+ */
+ static Pipeline generateWordCountPipeline(final PipelineOptions options,
+ final String inputFilePath, final String outputFilePath) {
final Pipeline p = Pipeline.create(options);
final PCollection<String> result = GenericSourceSink.read(p, inputFilePath)
.apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
@@ -66,6 +79,6 @@
}
}));
GenericSourceSink.write(result, outputFilePath);
- p.run();
+ return p;
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java
new file mode 100644
index 0000000..20281f3
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.nemo.examples.beam.WordCount.generateWordCountPipeline;
+
+/**
+ * WordCount application, but with a timeout of 1 second.
+ */
+public final class WordCountTimeOut1Sec {
+ private static final Logger LOG = LoggerFactory.getLogger(WordCountTimeOut1Sec.class.getName());
+
+ /**
+ * Private constructor.
+ */
+ private WordCountTimeOut1Sec() {
+ }
+
+ /**
+ * Main function for the MR BEAM program.
+ *
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String inputFilePath = args[0];
+ final String outputFilePath = args[1];
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
+ options.setJobName("WordCountTimeOut1Sec");
+
+ final Pipeline p = generateWordCountPipeline(options, inputFilePath, outputFilePath);
+ final PipelineResult pr = p.run();
+ final PipelineResult.State running = pr.waitUntilFinish(org.joda.time.Duration.standardSeconds(1));
+ try {
+ final PipelineResult.State cancelled = pr.cancel();
+ } catch (final IOException e) {
+ LOG.info("IOException while cancelling job");
+ }
+ }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java
new file mode 100644
index 0000000..23e024e
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.exception.OutputMismatchException;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestArgs;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test WordCount program with JobLauncher, but with a timeout.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class TimeoutITCase {
+ private static ArgBuilder builder;
+
+ private static final String inputFileName = "inputs/test_input_wordcount";
+ private static final String outputFileName = "test_output_wordcount";
+ private static final String expectedOutputFileName = "outputs/expected_output_wordcount";
+ private static final String executorResourceFileName = ExampleTestArgs.getFileBasePath() + "executors/beam_test_executor_resources.json";
+ private static final String inputFilePath = ExampleTestArgs.getFileBasePath() + inputFileName;
+ private static final String outputFilePath = ExampleTestArgs.getFileBasePath() + outputFileName;
+
+ @Before
+ public void setUp() throws Exception {
+ builder = new ArgBuilder()
+ .addUserMain(WordCountTimeOut1Sec.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath);
+ }
+
+ @Test(timeout = ExampleTestArgs.TIMEOUT, expected = OutputMismatchException.class)
+ public void testTimeout() throws Exception {
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(TimeoutITCase.class.getSimpleName() + "_wordcount")
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ try {
+ ExampleTestUtil.ensureOutputValidity(ExampleTestArgs.getFileBasePath(), outputFileName, expectedOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(ExampleTestArgs.getFileBasePath(), outputFileName);
+ }
+ }
+}
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
index b3ac05b..1bdefb8 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
@@ -28,6 +28,7 @@
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.state.PlanState;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.RuntimeMaster;
import org.apache.nemo.runtime.master.metric.MetricStore;
@@ -100,15 +101,16 @@
// Wait for the job to finish and stop logging
final PlanStateManager planStateManager = executionResult.left();
final ScheduledExecutorService dagLoggingExecutor = executionResult.right();
+ final PlanState.State state;
try {
- planStateManager.waitUntilFinish();
+ state = planStateManager.waitUntilFinish();
dagLoggingExecutor.shutdown();
} finally {
planStateManager.storeJSON("final");
}
final long endTime = System.currentTimeMillis();
- LOG.info("{} is complete!", physicalPlan.getPlanId());
+ LOG.info("{} is complete, with final status {}!", physicalPlan.getPlanId(), state);
MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
.setJobDuration(endTime - startTime);
} catch (final Exception e) {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
index 5841adb..53cab57 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
@@ -458,18 +458,7 @@
* @return the final state of this plan.
*/
public PlanState.State waitUntilFinish() {
- finishLock.lock();
- try {
- while (!isPlanDone()) {
- planFinishedCondition.await();
- }
- } catch (final InterruptedException e) {
- LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
- Thread.currentThread().interrupt();
- } finally {
- finishLock.unlock();
- }
- return getPlanState();
+ return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
@@ -490,7 +479,7 @@
}
}
} catch (final InterruptedException e) {
- LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
+ LOG.warn("Interrupted while waiting for the finish of Plan ID {}", planId);
Thread.currentThread().interrupt();
} finally {
finishLock.unlock();