blob: a6531fce3d8cd2c25f9412c1edfa5628133d858d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
/**
* Keeps the Ranges sorted by startIndex.
* The added ranges are always ensured to be non-overlapping.
* Provides the SkipRangeIterator, which skips the Ranges
* stored in this object.
*/
class SortedRanges implements Writable{
private static final Log LOG =
LogFactory.getLog(SortedRanges.class);
private TreeSet<Range> ranges = new TreeSet<Range>();
private long indicesCount;
/**
* Get Iterator which skips the stored ranges.
* The Iterator.next() call return the index starting from 0.
* @return SkipRangeIterator
*/
synchronized SkipRangeIterator skipRangeIterator(){
return new SkipRangeIterator(ranges.iterator());
}
/**
* Get the no of indices stored in the ranges.
* @return indices count
*/
synchronized long getIndicesCount() {
return indicesCount;
}
/**
* Get the sorted set of ranges.
* @return ranges
*/
synchronized SortedSet<Range> getRanges() {
return ranges;
}
/**
* Add the range indices. It is ensured that the added range
* doesn't overlap the existing ranges. If it overlaps, the
* existing overlapping ranges are removed and a single range
* having the superset of all the removed ranges and this range
* is added.
* If the range is of 0 length, doesn't do anything.
* @param range Range to be added.
*/
synchronized void add(Range range){
if(range.isEmpty()) {
return;
}
long startIndex = range.getStartIndex();
long endIndex = range.getEndIndex();
//make sure that there are no overlapping ranges
SortedSet<Range> headSet = ranges.headSet(range);
if(headSet.size()>0) {
Range previousRange = headSet.last();
LOG.debug("previousRange "+previousRange);
if(startIndex<previousRange.getEndIndex()) {
//previousRange overlaps this range
//remove the previousRange
if(ranges.remove(previousRange)) {
indicesCount-=previousRange.getLength();
}
//expand this range
startIndex = previousRange.getStartIndex();
endIndex = endIndex>=previousRange.getEndIndex() ?
endIndex : previousRange.getEndIndex();
}
}
Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
while(tailSetIt.hasNext()) {
Range nextRange = tailSetIt.next();
LOG.debug("nextRange "+nextRange +" startIndex:"+startIndex+
" endIndex:"+endIndex);
if(endIndex>=nextRange.getStartIndex()) {
//nextRange overlaps this range
//remove the nextRange
tailSetIt.remove();
indicesCount-=nextRange.getLength();
if(endIndex<nextRange.getEndIndex()) {
//expand this range
endIndex = nextRange.getEndIndex();
break;
}
} else {
break;
}
}
add(startIndex,endIndex);
}
/**
* Remove the range indices. If this range is
* found in existing ranges, the existing ranges
* are shrunk.
* If range is of 0 length, doesn't do anything.
* @param range Range to be removed.
*/
synchronized void remove(Range range) {
if(range.isEmpty()) {
return;
}
long startIndex = range.getStartIndex();
long endIndex = range.getEndIndex();
//make sure that there are no overlapping ranges
SortedSet<Range> headSet = ranges.headSet(range);
if(headSet.size()>0) {
Range previousRange = headSet.last();
LOG.debug("previousRange "+previousRange);
if(startIndex<previousRange.getEndIndex()) {
//previousRange overlaps this range
//narrow down the previousRange
if(ranges.remove(previousRange)) {
indicesCount-=previousRange.getLength();
LOG.debug("removed previousRange "+previousRange);
}
add(previousRange.getStartIndex(), startIndex);
if(endIndex<=previousRange.getEndIndex()) {
add(endIndex, previousRange.getEndIndex());
}
}
}
Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
while(tailSetIt.hasNext()) {
Range nextRange = tailSetIt.next();
LOG.debug("nextRange "+nextRange +" startIndex:"+startIndex+
" endIndex:"+endIndex);
if(endIndex>nextRange.getStartIndex()) {
//nextRange overlaps this range
//narrow down the nextRange
tailSetIt.remove();
indicesCount-=nextRange.getLength();
if(endIndex<nextRange.getEndIndex()) {
add(endIndex, nextRange.getEndIndex());
break;
}
} else {
break;
}
}
}
private void add(long start, long end) {
if(end>start) {
Range recRange = new Range(start, end-start);
ranges.add(recRange);
indicesCount+=recRange.getLength();
LOG.debug("added "+recRange);
}
}
public synchronized void readFields(DataInput in) throws IOException {
indicesCount = in.readLong();
ranges = new TreeSet<Range>();
int size = in.readInt();
for(int i=0;i<size;i++) {
Range range = new Range();
range.readFields(in);
ranges.add(range);
}
}
public synchronized void write(DataOutput out) throws IOException {
out.writeLong(indicesCount);
out.writeInt(ranges.size());
Iterator<Range> it = ranges.iterator();
while(it.hasNext()) {
Range range = it.next();
range.write(out);
}
}
public String toString() {
StringBuffer sb = new StringBuffer();
Iterator<Range> it = ranges.iterator();
while(it.hasNext()) {
Range range = it.next();
sb.append(range.toString()+"\n");
}
return sb.toString();
}
/**
* Index Range. Comprises of start index and length.
* A Range can be of 0 length also. The Range stores indices
* of type long.
*/
static class Range implements Comparable<Range>, Writable{
private long startIndex;
private long length;
Range(long startIndex, long length) {
if(length<0) {
throw new RuntimeException("length can't be negative");
}
this.startIndex = startIndex;
this.length = length;
}
Range() {
this(0,0);
}
/**
* Get the start index. Start index in inclusive.
* @return startIndex.
*/
long getStartIndex() {
return startIndex;
}
/**
* Get the end index. End index is exclusive.
* @return endIndex.
*/
long getEndIndex() {
return startIndex + length;
}
/**
* Get Length.
* @return length
*/
long getLength() {
return length;
}
/**
* Range is empty if its length is zero.
* @return <code>true</code> if empty
* <code>false</code> otherwise.
*/
boolean isEmpty() {
return length==0;
}
public boolean equals(Object o) {
if(o!=null && o instanceof Range) {
Range range = (Range)o;
return startIndex==range.startIndex &&
length==range.length;
}
return false;
}
public int hashCode() {
return Long.valueOf(startIndex).hashCode() +
Long.valueOf(length).hashCode();
}
public int compareTo(Range o) {
if(this.equals(o)) {
return 0;
}
return (this.startIndex > o.startIndex) ? 1:-1;
}
public void readFields(DataInput in) throws IOException {
startIndex = in.readLong();
length = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(startIndex);
out.writeLong(length);
}
public String toString() {
return startIndex +":" + length;
}
}
/**
* Index Iterator which skips the stored ranges.
*/
static class SkipRangeIterator implements Iterator<Long> {
Iterator<Range> rangeIterator;
Range range = new Range();
long next = -1;
/**
* Constructor
* @param rangeIterator the iterator which gives the ranges.
*/
SkipRangeIterator(Iterator<Range> rangeIterator) {
this.rangeIterator = rangeIterator;
doNext();
}
/**
* Returns true till the index reaches Long.MAX_VALUE.
* @return <code>true</code> next index exists.
* <code>false</code> otherwise.
*/
public synchronized boolean hasNext() {
return next<Long.MAX_VALUE;
}
/**
* Get the next available index. The index starts from 0.
* @return next index
*/
public synchronized Long next() {
long ci = next;
doNext();
return ci;
}
private void doNext() {
next++;
LOG.debug("currentIndex "+next +" "+range);
skipIfInRange();
while(next>=range.getEndIndex() && rangeIterator.hasNext()) {
range = rangeIterator.next();
skipIfInRange();
}
}
private void skipIfInRange() {
if(next>=range.getStartIndex() &&
next<range.getEndIndex()) {
//need to skip the range
LOG.warn("Skipping index " + next +"-" + range.getEndIndex());
next = range.getEndIndex();
}
}
/**
* Get whether all the ranges have been skipped.
* @return <code>true</code> if all ranges have been skipped.
* <code>false</code> otherwise.
*/
synchronized boolean skippedAllRanges() {
return !rangeIterator.hasNext() && next>range.getEndIndex();
}
/**
* Remove is not supported. Doesn't apply.
*/
public void remove() {
throw new UnsupportedOperationException("remove not supported.");
}
}
}