blob: fba90a11c1bd2a16790eeb78fed8cab21b3f5342 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
* An ordered collection of Tuples (possibly) with multiples. Data is
* stored in a priority queue as it comes in, and only sorted when iterator is requested.
* LimitedSortedDataBag is not spillable.
* We allow a user defined comparator, but provide a default comparator in
* cases where the user doesn't specify one.
public class LimitedSortedDataBag implements DataBag {
private static final Log log = LogFactory.getLog(LimitedSortedDataBag.class);
private static final long serialVersionUID = 1L;
private final Comparator<Tuple> mComp;
private final PriorityQueue<Tuple> priorityQ;
private final long limit;
* @param comp Comparator to use to do the sorting.
* If null, DefaultComparator will be used.
public LimitedSortedDataBag(Comparator<Tuple> comp, long limit) {
this.mComp = comp == null ? new DefaultComparator() : comp;
this.limit = limit;
this.priorityQ = new PriorityQueue<Tuple>(
(int)limit, getReversedComparator(mComp));
* Get the number of elements in the bag in memory.
* @return number of elements in the bag
public long size() {
return priorityQ.size();
* Find out if the bag is sorted.
* @return true if this is a sorted data bag, false otherwise.
public boolean isSorted() {
return true;
* Find out if the bag is distinct.
* @return true if the bag is a distinct bag, false otherwise.
public boolean isDistinct() {
return false;
* Get an iterator to the bag. For default and distinct bags,
* no particular order is guaranteed. For sorted bags the order
* is guaranteed to be sorted according
* to the provided comparator.
* @return tuple iterator
public Iterator<Tuple> iterator() {
return new LimitedSortedDataBagIterator();
* Add a tuple to the bag.
* @param t tuple to add.
public void add(Tuple t) {
if (priorityQ.size() > limit) {
* Add contents of a bag to the bag.
* @param b bag to add contents of.
public void addAll(DataBag b) {
Iterator<Tuple> it = b.iterator();
while(it.hasNext()) {
* Clear out the contents of the bag, both on disk and in memory.
* Any attempts to read after this is called will produce undefined
* results.
public void clear() {
* Write a bag's contents to disk.
* @param out DataOutput to write data to.
* @throws IOException (passes it on from underlying calls).
public void write(DataOutput out) throws IOException {
// We don't care whether this bag was sorted or distinct because
// using the iterator to write it will guarantee those things come
// correctly. And on the other end there'll be no reason to waste
// time re-sorting or re-applying distinct.
Iterator<Tuple> it = iterator();
while (it.hasNext()) {
Tuple item =;
* Read a bag from disk.
* @param in DataInput to read data from.
* @throws IOException (passes it on from underlying calls).
public void readFields(DataInput in) throws IOException {
long size = in.readLong();
for (long i = 0; i < size; i++) {
try {
Object o = DataReaderWriter.readDatum(in);
} catch (ExecException ee) {
throw ee;
* Write the bag into a string.
public String toString() {
StringBuffer sb = new StringBuffer();
Iterator<Tuple> it = iterator();
while ( it.hasNext() ) {
Tuple t =;
String s = t.toString();
if (it.hasNext()) sb.append(",");
return sb.toString();
public int compareTo(Object other) {
if (this == other)
return 0;
if (other instanceof DataBag) {
DataBag bOther = (DataBag) other;
if (this.size() != bOther.size()) {
if (this.size() > bOther.size()) return 1;
else return -1;
// if we got this far, both bags should have same size
// make a LimitedSortedBag for the other bag with same comparator and limit
// so that both bag are sorted and we can loop through both iterators
DataBag otherCloneDataBag = new LimitedSortedDataBag(mComp, limit);
otherCloneDataBag.addAll((DataBag) other);
Iterator<Tuple> thisIt = this.iterator();
Iterator<Tuple> otherIt = otherCloneDataBag.iterator();
while (thisIt.hasNext() && otherIt.hasNext()) {
Tuple thisT =;
Tuple otherT =;
int c = thisT.compareTo(otherT);
if (c != 0) return c;
return 0; // if we got this far, they must be equal
} else {
return, other);
* Not implemented.
* This is used by FuncEvalSpec.FakeDataBag.
* @param stale Set stale state.
public void markStale(boolean stale) {
throw new RuntimeException("LimitedSortedDataBag cannot be marked stale");
* Not implemented.
public long spill() {
return 0;
* Not implemented.
public long getMemorySize() {
return 0;
private static class DefaultComparator implements Comparator<Tuple> {
public int compare(Tuple t1, Tuple t2) {
return t1.compareTo(t2);
public boolean equals(Object o) {
return false;
public int hashCode() {
return 42;
* Since comparator in Java 1.7 does not have reversed(),
* we need this method to get a reversed comparator for a given comparator
* @param comp Comparator to reverse
* @return reversed comparator
private <T> Comparator<T> getReversedComparator(final Comparator<T> comp) {
return new Comparator<T>() {
public int compare(T o1, T o2) {
return, o2);
public boolean equals(Object o) {
return comp.equals(o);
* An iterator that handles getting the next tuple from the bag.
* Since priority queue iterator does not return elements in any order
* we need to dump elements in a List, sort them, and return iterator of the List
private class LimitedSortedDataBagIterator implements Iterator<Tuple> {
private int mCntr;
private final List<Tuple> mContents;
public LimitedSortedDataBagIterator() {
mCntr = 0;
mContents = new ArrayList<>(priorityQ);
Collections.sort(mContents, mComp);
public boolean hasNext() {
return (mCntr < mContents.size());
public Tuple next() {
// This will report progress every 1024 times through next.
// This should be much faster than using mod.
if ((mCntr & 0x3ff) == 0) reportProgress();
return mContents.get(mCntr++);
* Not implemented.
public void remove() {
throw new RuntimeException("Cannot remove() from LimitedSortedDataBag.iterator()");
* Report progress to HDFS.
protected void reportProgress() {
if (PhysicalOperator.getReporter() != null) {