blob: 697f1928db1566eab447ff41499bd4992351bbf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
/**
* This class provides a much more intuitive way to write Accumulator UDFs.<br>
* For example, you could express IsEmpty as follows:
* <pre><code>public class IsEmpty extends IteratingAccumulatorEvalFunc<Boolean> {
* public Boolean exec(Iterator<Tuple> iter) throws IOException {
* return !iter.hashNext();
* }
* }</code></pre>
* Count could be implemented as follows:
* <pre><code>public class Count extends IteratingAccumulatorEvalFunc<Long> {
* public Long exec(Iterator<Tuple> iter) throws IOException {
* long ct = 0;
* for (; iter.hasNext(); iter.next()) {
* ct++;
* }
* return ct;
* }
* }</code></pre>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class IteratingAccumulatorEvalFunc<T> extends AccumulatorEvalFunc<T> implements TerminatingAccumulator<T> {
private boolean isInitialized = false;
private DelayedQueueIterator dqi;
private BlockingQueue<Tuple> queue;
private volatile boolean isFinished = false;
private volatile boolean noMoreValues = false;
private volatile boolean exceptionThrown = false;
private T returnValue;
private Thread executionThread;
private Exception executionThreadException;
private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
private static final long WAIT_TO_OFFER = 500L;
private static final long WAIT_TO_POLL = 500L;
private static final long WAIT_TO_JOIN = 500L;
private void initialize() {
dqi = new DelayedQueueIterator();
queue = new LinkedBlockingQueue<Tuple>(10000);
executionThread = new Thread(new Runnable() {
@Override
public void run() {
try {
returnValue = exec(dqi);
isFinished = true;
} catch (Exception e) {
executionThreadException = e;
exceptionThrown = true;
}
}
});
executionThread.start();
isInitialized = true;
}
@Override
public boolean isFinished() {
return isFinished;
}
@Override
public void accumulate(Tuple input) throws IOException {
if (!isInitialized)
initialize();
for (Tuple t : (DataBag)input.get(0)) {
if (isFinished)
return;
boolean added = false;
while (!isFinished && !added && !exceptionThrown)
try {
added = queue.offer(t, WAIT_TO_OFFER, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} //TODO handle the exception?
if (exceptionThrown)
throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
}
}
@Override
public T getValue() {
noMoreValues = true;
do {
if (exceptionThrown)
throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
try {
executionThread.join(WAIT_TO_JOIN);
} catch (InterruptedException e) {
} //TODO handle the exception?
} while (executionThread.isAlive());
return returnValue;
}
@Override
public void cleanup() {
returnValue = null;
dqi = null;
queue = null;
isFinished = false;
noMoreValues = false;
executionThread = null;
isInitialized = false;
}
public abstract T exec(Iterator<Tuple> iter) throws IOException;
private class DelayedQueueIterator implements Iterator<Tuple> {
private Tuple next;
@Override
public boolean hasNext() {
if (next != null)
return true;
while (!noMoreValues) {
try {
next = queue.poll(WAIT_TO_POLL, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (next != null)
return true;
}
next = queue.poll();
if (next != null)
return true;
return false;
}
@Override
public Tuple next() {
Tuple t = next;
if (t == null)
throw new RuntimeException("Entered inconsistent state!");
next = null;
return t;
}
@Override
public void remove() {}
}
}