blob: e651ae8e05e406b7a3080305d341efccdd346edf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DeskewedJobTraceReader implements Closeable {
// underlying engine
private final JobTraceReader reader;
// configuration variables
private final int skewBufferLength;
private final boolean abortOnUnfixableSkew;
// state variables
private long skewMeasurementLatestSubmitTime = Long.MIN_VALUE;
private long returnedLatestSubmitTime = Long.MIN_VALUE;
private int maxSkewBufferNeeded = 0;
// a submit time will NOT be in countedRepeatedSubmitTimesSoFar if
// it only occurs once. This situation is represented by having the
// time in submitTimesSoFar only. A submit time that occurs twice or more
// appears in countedRepeatedSubmitTimesSoFar [with the appropriate range
// value] AND submitTimesSoFar
private TreeMap<Long, Integer> countedRepeatedSubmitTimesSoFar =
new TreeMap<Long, Integer>();
private TreeSet<Long> submitTimesSoFar = new TreeSet<Long>();
private final PriorityQueue<LoggedJob> skewBuffer;
static final private Log LOG =
LogFactory.getLog(DeskewedJobTraceReader.class);
static private class JobComparator implements Comparator<LoggedJob>,
Serializable {
@Override
public int compare(LoggedJob j1, LoggedJob j2) {
return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
.getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
}
}
/**
* Constructor.
*
* @param reader
* the {@link JobTraceReader} that's being protected
* @param skewBufferLength
* [the number of late jobs that can preced a later out-of-order
* earlier job
* @throws IOException
*/
public DeskewedJobTraceReader(JobTraceReader reader, int skewBufferLength,
boolean abortOnUnfixableSkew) throws IOException {
this.reader = reader;
this.skewBufferLength = skewBufferLength;
this.abortOnUnfixableSkew = abortOnUnfixableSkew;
skewBuffer =
new PriorityQueue<LoggedJob>(skewBufferLength + 1, new JobComparator());
fillSkewBuffer();
}
public DeskewedJobTraceReader(JobTraceReader reader) throws IOException {
this(reader, 0, true);
}
private LoggedJob rawNextJob() throws IOException {
LoggedJob result = reader.getNext();
if ((!abortOnUnfixableSkew || skewBufferLength > 0) && result != null) {
long thisTime = result.getSubmitTime();
if (submitTimesSoFar.contains(thisTime)) {
Integer myCount = countedRepeatedSubmitTimesSoFar.get(thisTime);
countedRepeatedSubmitTimesSoFar.put(thisTime, myCount == null ? 2
: myCount + 1);
} else {
submitTimesSoFar.add(thisTime);
}
if (thisTime < skewMeasurementLatestSubmitTime) {
Iterator<Long> endCursor = submitTimesSoFar.descendingIterator();
int thisJobNeedsSkew = 0;
Long keyNeedingSkew;
while (endCursor.hasNext()
&& (keyNeedingSkew = endCursor.next()) > thisTime) {
Integer keyNeedsSkewAmount =
countedRepeatedSubmitTimesSoFar.get(keyNeedingSkew);
thisJobNeedsSkew +=
keyNeedsSkewAmount == null ? 1 : keyNeedsSkewAmount;
}
maxSkewBufferNeeded = Math.max(maxSkewBufferNeeded, thisJobNeedsSkew);
}
skewMeasurementLatestSubmitTime =
Math.max(thisTime, skewMeasurementLatestSubmitTime);
}
return result;
}
static class OutOfOrderException extends RuntimeException {
static final long serialVersionUID = 1L;
public OutOfOrderException(String text) {
super(text);
}
}
LoggedJob nextJob() throws IOException, OutOfOrderException {
LoggedJob newJob = rawNextJob();
if (newJob != null) {
skewBuffer.add(newJob);
}
LoggedJob result = skewBuffer.poll();
while (result != null && result.getSubmitTime() < returnedLatestSubmitTime) {
LOG.error("The current job was submitted earlier than the previous one");
LOG.error("Its jobID is " + result.getJobID());
LOG.error("Its submit time is " + result.getSubmitTime()
+ ",but the previous one was " + returnedLatestSubmitTime);
if (abortOnUnfixableSkew) {
throw new OutOfOrderException("Job submit time is "
+ result.getSubmitTime() + ",but the previous one was "
+ returnedLatestSubmitTime);
}
result = rawNextJob();
}
if (result != null) {
returnedLatestSubmitTime = result.getSubmitTime();
}
return result;
}
private void fillSkewBuffer() throws IOException {
for (int i = 0; i < skewBufferLength; ++i) {
LoggedJob newJob = rawNextJob();
if (newJob == null) {
return;
}
skewBuffer.add(newJob);
}
}
int neededSkewBufferSize() {
return maxSkewBufferNeeded;
}
@Override
public void close() throws IOException {
reader.close();
}
}