blob: fba90a11c1bd2a16790eeb78fed8cab21b3f5342 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
/**
* An ordered collection of Tuples (possibly) with multiples. Data is
* stored in a priority queue as it comes in, and only sorted when iterator is requested.
*
* LimitedSortedDataBag is not spillable.
*
* We allow a user defined comparator, but provide a default comparator in
* cases where the user doesn't specify one.
*/
public class LimitedSortedDataBag implements DataBag {
private static final Log log = LogFactory.getLog(LimitedSortedDataBag.class);
private static final long serialVersionUID = 1L;
private final Comparator<Tuple> mComp;
private final PriorityQueue<Tuple> priorityQ;
private final long limit;
/**
* @param comp Comparator to use to do the sorting.
* If null, DefaultComparator will be used.
*/
public LimitedSortedDataBag(Comparator<Tuple> comp, long limit) {
this.mComp = comp == null ? new DefaultComparator() : comp;
this.limit = limit;
this.priorityQ = new PriorityQueue<Tuple>(
(int)limit, getReversedComparator(mComp));
}
/**
* Get the number of elements in the bag in memory.
* @return number of elements in the bag
*/
@Override
public long size() {
return priorityQ.size();
}
/**
* Find out if the bag is sorted.
* @return true if this is a sorted data bag, false otherwise.
*/
@Override
public boolean isSorted() {
return true;
}
/**
* Find out if the bag is distinct.
* @return true if the bag is a distinct bag, false otherwise.
*/
@Override
public boolean isDistinct() {
return false;
}
/**
* Get an iterator to the bag. For default and distinct bags,
* no particular order is guaranteed. For sorted bags the order
* is guaranteed to be sorted according
* to the provided comparator.
* @return tuple iterator
*/
@Override
public Iterator<Tuple> iterator() {
return new LimitedSortedDataBagIterator();
}
/**
* Add a tuple to the bag.
* @param t tuple to add.
*/
@Override
public void add(Tuple t) {
priorityQ.add(t);
if (priorityQ.size() > limit) {
priorityQ.poll();
}
}
/**
* Add contents of a bag to the bag.
* @param b bag to add contents of.
*/
@Override
public void addAll(DataBag b) {
Iterator<Tuple> it = b.iterator();
while(it.hasNext()) {
add(it.next());
}
}
/**
* Clear out the contents of the bag, both on disk and in memory.
* Any attempts to read after this is called will produce undefined
* results.
*/
@Override
public void clear() {
priorityQ.clear();
}
/**
* Write a bag's contents to disk.
* @param out DataOutput to write data to.
* @throws IOException (passes it on from underlying calls).
*/
public void write(DataOutput out) throws IOException {
// We don't care whether this bag was sorted or distinct because
// using the iterator to write it will guarantee those things come
// correctly. And on the other end there'll be no reason to waste
// time re-sorting or re-applying distinct.
out.writeLong(size());
Iterator<Tuple> it = iterator();
while (it.hasNext()) {
Tuple item = it.next();
item.write(out);
}
}
/**
* Read a bag from disk.
* @param in DataInput to read data from.
* @throws IOException (passes it on from underlying calls).
*/
public void readFields(DataInput in) throws IOException {
long size = in.readLong();
for (long i = 0; i < size; i++) {
try {
Object o = DataReaderWriter.readDatum(in);
add((Tuple)o);
} catch (ExecException ee) {
throw ee;
}
}
}
/**
* Write the bag into a string.
*/
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append('{');
Iterator<Tuple> it = iterator();
while ( it.hasNext() ) {
Tuple t = it.next();
String s = t.toString();
sb.append(s);
if (it.hasNext()) sb.append(",");
}
sb.append('}');
return sb.toString();
}
@SuppressWarnings("unchecked")
@Override
public int compareTo(Object other) {
if (this == other)
return 0;
if (other instanceof DataBag) {
DataBag bOther = (DataBag) other;
if (this.size() != bOther.size()) {
if (this.size() > bOther.size()) return 1;
else return -1;
}
// if we got this far, both bags should have same size
// make a LimitedSortedBag for the other bag with same comparator and limit
// so that both bag are sorted and we can loop through both iterators
DataBag otherCloneDataBag = new LimitedSortedDataBag(mComp, limit);
otherCloneDataBag.addAll((DataBag) other);
Iterator<Tuple> thisIt = this.iterator();
Iterator<Tuple> otherIt = otherCloneDataBag.iterator();
while (thisIt.hasNext() && otherIt.hasNext()) {
Tuple thisT = thisIt.next();
Tuple otherT = otherIt.next();
int c = thisT.compareTo(otherT);
if (c != 0) return c;
}
return 0; // if we got this far, they must be equal
} else {
return DataType.compare(this, other);
}
}
/**
* Not implemented.
* This is used by FuncEvalSpec.FakeDataBag.
* @param stale Set stale state.
*/
@Override
public void markStale(boolean stale) {
throw new RuntimeException("LimitedSortedDataBag cannot be marked stale");
}
/**
* Not implemented.
*/
@Override
public long spill() {
return 0;
}
/**
* Not implemented.
*/
@Override
public long getMemorySize() {
return 0;
}
private static class DefaultComparator implements Comparator<Tuple> {
@Override
@SuppressWarnings("unchecked")
public int compare(Tuple t1, Tuple t2) {
return t1.compareTo(t2);
}
@Override
public boolean equals(Object o) {
return false;
}
@Override
public int hashCode() {
return 42;
}
}
/**
* Since comparator in Java 1.7 does not have reversed(),
* we need this method to get a reversed comparator for a given comparator
* @param comp Comparator to reverse
* @return reversed comparator
*/
private <T> Comparator<T> getReversedComparator(final Comparator<T> comp) {
return new Comparator<T>() {
@Override
public int compare(T o1, T o2) {
return -comp.compare(o1, o2);
}
@Override
public boolean equals(Object o) {
return comp.equals(o);
}
};
}
/**
* An iterator that handles getting the next tuple from the bag.
* Since priority queue iterator does not return elements in any order
* we need to dump elements in a List, sort them, and return iterator of the List
*/
private class LimitedSortedDataBagIterator implements Iterator<Tuple> {
private int mCntr;
private final List<Tuple> mContents;
public LimitedSortedDataBagIterator() {
mCntr = 0;
mContents = new ArrayList<>(priorityQ);
Collections.sort(mContents, mComp);
}
@Override
public boolean hasNext() {
return (mCntr < mContents.size());
}
@Override
public Tuple next() {
// This will report progress every 1024 times through next.
// This should be much faster than using mod.
if ((mCntr & 0x3ff) == 0) reportProgress();
return mContents.get(mCntr++);
}
/**
* Not implemented.
*/
public void remove() {
throw new RuntimeException("Cannot remove() from LimitedSortedDataBag.iterator()");
}
}
/**
* Report progress to HDFS.
*/
protected void reportProgress() {
if (PhysicalOperator.getReporter() != null) {
PhysicalOperator.getReporter().progress();
}
}
}