blob: 1032af178920cd56b60871e3b64aeb7945b645d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.WeakHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.SelfSpillBag.MemoryLimits;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.GroupingSpillable;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.impl.util.SpillableMemoryManager;
import com.google.common.collect.Maps;
/**
* Do partial aggregation in map plan. Inputs are buffered up in
* a hashmap until a threshold is reached; then the combiner functions
* are fed these buffered up inputs, and results stored in a secondary
* map. Once that map fills up or all input has been seen, results are
* piped out into the next operator (caller of getNext()).
*/
public class POPartialAgg extends PhysicalOperator implements Spillable, GroupingSpillable {
private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
private static final long serialVersionUID = 1L;
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
// number of records to sample to determine average size used by each
// entry in hash map and average seen reduction
private static final int NUM_RECS_TO_SAMPLE = 10000;
// We want to allow bigger list sizes for group all.
// But still have a cap on it to avoid JVM finding it hard to allocate space
// TODO: How high can we go without performance degradation??
private static final int MAX_LIST_SIZE = 25000;
// We want to avoid massive ArrayList copies as they get big.
// Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
// 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
// large value to trigger spilling/aggregation instead of paying for yet another data
// copy.
// For group all cases, we will set this to a higher value
private int listSizeThreshold = 9367;
// Using default min reduction 7 instead of 10 as processedInputMap size
// will be 4096 (hashmap size is power of 2) for both 20000/10 and 20000/7.
// So using the lower number 7 as even 7x reduction is worth using map side aggregation
private static final int DEFAULT_MIN_REDUCTION = 7;
// TODO: these are temporary. The real thing should be using memory usage estimation.
private static final int FIRST_TIER_THRESHOLD = 20000;
private static final int SECOND_TIER_THRESHOLD = FIRST_TIER_THRESHOLD / DEFAULT_MIN_REDUCTION;
private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
private PhysicalPlan keyPlan;
private ExpressionOperator keyLeaf;
private List<PhysicalPlan> valuePlans;
private List<ExpressionOperator> valueLeaves;
private boolean isGroupAll;
private transient int numRecsInRawMap;
private transient int numRecsInProcessedMap;
private transient Map<Object, List<Tuple>> rawInputMap;
private transient Map<Object, List<Tuple>> processedInputMap;
//Transient booleans always initialize to false
private transient boolean initialized;
private transient boolean disableMapAgg;
private transient boolean sizeReductionChecked;
private transient boolean inputsExhausted;
private transient boolean estimatedMemThresholds;
// The doSpill flag is set when spilling is running or needs to run.
// It is set by POPartialAgg when its buffers are full after having run aggregations and
// the records have to be emitted to the map output.
// The doContingentSpill flag is set when the SpillableMemoryManager is notified
// by GC that the runtime is low on memory and the SpillableMemoryManager identifies
// the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
// to satisfy the memory manager's request for freeing memory by aggregating data
// rather than just spilling records to disk.
private transient volatile boolean doSpill;
private transient volatile boolean doContingentSpill;
private transient volatile boolean startedContingentSpill;
private transient volatile Object spillLock;
private transient int minOutputReduction;
private transient float percentUsage;
private transient int numRecordsToSample;
private transient int firstTierThreshold;
private transient int secondTierThreshold;
private transient int sizeReduction;
private transient int avgTupleSize;
private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
public POPartialAgg(OperatorKey k) {
this(k, false);
}
public POPartialAgg(OperatorKey k, boolean isGroupAll) {
super(k);
this.isGroupAll = isGroupAll;
}
private void init() throws ExecException {
numRecsInRawMap = 0;
numRecsInProcessedMap = 0;
minOutputReduction = DEFAULT_MIN_REDUCTION;
numRecordsToSample = NUM_RECS_TO_SAMPLE;
firstTierThreshold = FIRST_TIER_THRESHOLD;
secondTierThreshold = SECOND_TIER_THRESHOLD;
sizeReduction = 1;
avgTupleSize = 0;
percentUsage = 0.2F;
spillLock = new Object();
if (PigMapReduce.sJobConfInternal.get() != null) {
String usage = PigMapReduce.sJobConfInternal.get().get(
PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
if (usage != null) {
percentUsage = Float.parseFloat(usage);
}
minOutputReduction = PigMapReduce.sJobConfInternal.get().getInt(
PigConfiguration.PIG_EXEC_MAP_PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
if (minOutputReduction <= 0) {
LOG.info("Specified reduction is < 0 (" + minOutputReduction + "). Using default " +
DEFAULT_MIN_REDUCTION);
minOutputReduction = DEFAULT_MIN_REDUCTION;
}
}
if (percentUsage <= 0) {
LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
disableMapAgg = true;
// Set them to true instead of adding another check for !disableMapAgg
sizeReductionChecked = true;
estimatedMemThresholds = true;
} else {
ALL_POPARTS.put(this, null);
SpillableMemoryManager.getInstance().registerSpillable(this);
}
// Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
// newHashMapWithExpectedSize does new HashMap(expectedSize + expectedSize/3)
// to factor in default load factor of 0.75.
// For Hashmap, internally its size is always in power of 2.
// So for NUM_RECS_TO_SAMPLE=10000, hashmap size will be 16384
// With secondTierThreshold of 2857 (minReduction 7), hashmap size will be 4096
if (!disableMapAgg) {
rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE);
processedInputMap = Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD);
}
if (isGroupAll) {
listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
}
initialized = true;
}
@Override
public Result getNextTuple() throws ExecException {
// accumulate tuples from processInput in rawInputMap.
// when the maps grow to mem limit, go over each item in map, and call
// combiner aggs on each collection.
// Store the results into processedInputMap. Clear out rawInputMap.
// Mem usage is updated every time we modify either of the maps.
// When processedInputMap is >= 20% of allotted memory, run aggs on it,
// and output the results as returns of successive calls of this method.
// Then reset processedInputMap.
// The fact that we are in the latter stage is communicated via the doSpill
// flag.
if (!initialized) {
init();
}
while (true) {
if (!sizeReductionChecked && numRecsInRawMap >= numRecordsToSample) {
checkSizeReduction();
if (doContingentSpill && !doSpill) {
LOG.info("Avoided emitting records during spill memory call.");
doContingentSpill = false;
}
}
if (!estimatedMemThresholds && numRecsInRawMap >= numRecordsToSample) {
estimateMemThresholds();
}
if (doContingentSpill) {
startedContingentSpill = true;
// Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
if (doSpill == false) {
// SpillableMemoryManager requested a spill to reduce memory
// consumption. See if we can avoid it.
// If we see that there are way more records in processed map than
// raw map, it is better to spill after the aggregation
boolean spillProcessedMap = (3 * numRecsInRawMap) < numRecsInProcessedMap;
aggregateBothLevels(false, false);
// Spill if the numRecsInProcessedMap
if (spillProcessedMap || shouldSpill()) {
startSpill(false);
} else {
LOG.info("Avoided emitting records during spill memory call.");
doContingentSpill = false;
}
}
}
if (doSpill) {
startSpill(true);
Result result = spillResult();
if (result.returnStatus == POStatus.STATUS_EOP) {
doSpill = false;
doContingentSpill = false;
}
if (result.returnStatus != POStatus.STATUS_EOP) {
return result;
} else if (inputsExhausted) {
freeMemory();
return result;
}
}
if (mapAggDisabled()) {
// disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
// if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
freeMemory();
return processInput();
} else {
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
} else if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan.endOfAllInput) {
// parent input is over. flush what we have.
inputsExhausted = true;
LOG.info("Spilling last bits.");
startSpill(true);
continue;
} else {
return EOP_RESULT;
}
} else if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
} else {
// add this input to map.
Tuple inpTuple = (Tuple) inp.result;
keyPlan.attachInput(inpTuple);
// evaluate the key
Result keyRes = getResult(keyLeaf);
if (keyRes.returnStatus != POStatus.STATUS_OK) {
return keyRes;
}
Object key = keyRes.result;
keyPlan.detachInput();
numRecsInRawMap += 1;
addKeyValToMap(rawInputMap, key, inpTuple);
aggregateBothLevels(true, true);
if (shouldSpill()) {
startSpill(false); // next time around, we'll start emitting.
}
}
}
}
}
private void freeMemory() throws ExecException {
if (rawInputMap != null && !rawInputMap.isEmpty()) {
throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty");
}
// Free up the maps for garbage collection
rawInputMap = null;
processedInputMap = null;
}
private void estimateMemThresholds() {
if (!mapAggDisabled()) {
LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()
+ " POPArtialAgg objects." + " with memory percentage "
+ percentUsage);
MemoryLimits memLimits = new MemoryLimits(ALL_POPARTS.size(), percentUsage);
int estTotalMem = 0;
int estTuples = 0;
for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) {
for (Tuple t : entry.getValue()) {
estTuples += 1;
int mem = (int) t.getMemorySize();
estTotalMem += mem;
memLimits.addNewObjSize(mem);
}
}
avgTupleSize = estTotalMem / estTuples;
long totalTuples = memLimits.getCacheLimit();
LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
// The second tier should at least allow one tuple before it tries to aggregate.
// This code retains the total number of tuples in the buffer while guaranteeing
// the second tier has at least one tuple.
if (secondTierThreshold == 0) {
secondTierThreshold += 1;
firstTierThreshold -= 1;
}
if (isGroupAll) {
listSizeThreshold = Math.min(firstTierThreshold, MAX_LIST_SIZE);
}
}
estimatedMemThresholds = true;
}
private void checkSizeReduction() throws ExecException {
if (!mapAggDisabled()) {
int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
aggregateBothLevels(false, false);
int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
LOG.info("Observed reduction factor: from " + numBeforeReduction +
" to " + numAfterReduction +
" => " + numBeforeReduction / numAfterReduction + ".");
if ( numBeforeReduction / numAfterReduction < minOutputReduction) {
LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minOutputReduction);
disableMapAgg();
}
sizeReduction = numBeforeReduction / numAfterReduction;
sizeReductionChecked = true;
}
}
private void disableMapAgg() throws ExecException {
// Do not aggregate as when disableMapAgg is called aggregation is
// called and size reduction checked
startSpill(false);
disableMapAgg = true;
ALL_POPARTS.remove(this);
}
private boolean mapAggDisabled() {
return disableMapAgg;
}
private boolean shouldAggregateFirstLevel() {
return (numRecsInRawMap > firstTierThreshold);
}
private boolean shouldAggregateSecondLevel() {
return (numRecsInProcessedMap > secondTierThreshold);
}
private boolean shouldSpill() {
// is this always the same as shouldAgg?
return shouldAggregateSecondLevel();
}
private void addKeySingleValToMap(Map<Object, List<Tuple>> map,
Object key, List<Tuple> inpTuple) throws ExecException {
List<Tuple> value = map.get(key);
if (value == null) {
map.put(key, inpTuple);
} else {
value.add(inpTuple.get(0));
}
}
private void addKeyValToMap(Map<Object, List<Tuple>> map,
Object key, Tuple inpTuple) throws ExecException {
List<Tuple> value = map.get(key);
if (value == null) {
value = createNewValueList(map);
map.put(key, value);
}
value.add(inpTuple);
if (value.size() > listSizeThreshold) {
boolean isFirst = (map == rawInputMap);
if (LOG.isDebugEnabled()){
LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
}
if (isFirst) {
// Aggregate and remove just this key to keep size in check
aggregateRawRow(key, value);
} else {
aggregateSecondLevel();
}
}
}
private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
List<Tuple> value;
if (isGroupAll) {
// Set exact array initial size to avoid array copies
// listSizeThreshold = numRecordsToSample before estimating memory
// thresholds and firstTierThreshold after memory estimation
int listSize = (map == rawInputMap) ? listSizeThreshold : Math.min(secondTierThreshold + 1, MAX_LIST_SIZE);
value = new ArrayList<Tuple>(listSize);
} else {
value = new ArrayList<Tuple>();
}
return value;
}
private void startSpill(boolean aggregate) throws ExecException {
// If spillingIterator is null, we are already spilling and don't need to set up.
if (spillingIterator != null) return;
LOG.info("Starting spill.");
if (aggregate) {
aggregateBothLevels(false, false);
}
doSpill = true;
spillingIterator = processedInputMap.entrySet().iterator();
}
private Result spillResult() throws ExecException {
// if no more to spill, return EOP_RESULT.
if (processedInputMap.isEmpty()) {
spillingIterator = null;
LOG.info("In spillResults(), processed map is empty -- done spilling.");
return EOP_RESULT;
} else {
Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
numRecsInProcessedMap -= entry.getValue().size();
spillingIterator.remove();
Result res = getOutput(entry.getKey(), valueTuple);
return res;
}
}
private void aggregateRawRow(Object key, List<Tuple> value) throws ExecException {
numRecsInRawMap -= value.size();
Tuple valueTuple = createValueTuple(key, value);
Result res = getOutput(key, valueTuple);
rawInputMap.remove(key);
addKeyValToMap(processedInputMap, key, getAggResultTuple(res.result));
numRecsInProcessedMap++;
}
/**
* For each entry in rawInputMap, feed the list of tuples into the aggregator funcs
* and add the results to processedInputMap. Remove the entries from rawInputMap as we go.
* @throws ExecException
*/
private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
boolean srcDestDifferent = (fromMap == toMap) ? false : true;
Iterator<Map.Entry<Object, List<Tuple>>> iter = fromMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Object, List<Tuple>> entry = iter.next();
if (entry.getValue().size() == 1) {
// If fromMap and toMap are same (processedInputMap), then continue without any change
// If different (rawInputMap->processedInputMap), add directly to target and skip running through the valuePlans
if (srcDestDifferent) {
iter.remove();
addKeySingleValToMap(toMap, entry.getKey(), entry.getValue());
}
} else {
Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
Result res = getOutput(entry.getKey(), valueTuple);
if (srcDestDifferent) {
// Remove from src and add to destination (rawInputMap->processedInputMap)
iter.remove();
addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
} else {
// Update processedInputMap in place
List<Tuple> value = entry.getValue();
if (isGroupAll) {
value.clear();
} else {
// Creating a new list to free more space instead of
// calling clear as the same key might not repeat again
value = createNewValueList(toMap);
toMap.put(entry.getKey(), value);
}
value.add(getAggResultTuple(res.result));
}
}
numEntriesInTarget++;
}
return numEntriesInTarget;
}
private void aggregateBothLevels(boolean checkThresholdForFirst,
boolean checkThresholdForSecond) throws ExecException {
// When processed map is initially empty, just aggregate first level as
// aggregating second level immediately would not yield anything
boolean aggregateSecondLevel = !processedInputMap.isEmpty();
if (!checkThresholdForFirst || shouldAggregateFirstLevel()) {
aggregateFirstLevel();
}
if (aggregateSecondLevel && (!checkThresholdForSecond || shouldAggregateSecondLevel())) {
aggregateSecondLevel();
}
}
private void aggregateFirstLevel() throws ExecException {
if (rawInputMap.isEmpty()) {
return;
}
int rawTuples = numRecsInRawMap;
int processedTuples = numRecsInProcessedMap;
numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
numRecsInRawMap = 0;
LOG.info("Aggregated " + rawTuples+ " raw tuples."
+ " Processed tuples before aggregation = " + processedTuples
+ ", after aggregation = " + numRecsInProcessedMap);
}
private void aggregateSecondLevel() throws ExecException {
if (processedInputMap.isEmpty()) {
return;
}
int processedTuples = numRecsInProcessedMap;
numRecsInProcessedMap = aggregate(processedInputMap, processedInputMap, 0);
LOG.info("Aggregated " + processedTuples + " processed tuples to " + numRecsInProcessedMap + " tuples");
}
private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
Tuple valueTuple = mTupleFactory.newTuple(valuePlans.size() + 1);
valueTuple.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
DataBag bag = null;
if (doContingentSpill) {
// Don't use additional memory since we already have memory stress
bag = new InternalCachedBag();
} else {
// Take 10% of memory, need fine tune later
bag = new InternalCachedBag(1, 0.1F);
}
valueTuple.set(i + 1, bag);
}
for (Tuple t : inpTuples) {
for (int i = 1; i < t.size(); i++) {
DataBag bag = (DataBag) valueTuple.get(i);
bag.add((Tuple) t.get(i));
}
}
return valueTuple;
}
private Tuple getAggResultTuple(Object result) throws ExecException {
try {
return (Tuple) result;
} catch (ClassCastException ex) {
throw new ExecException("Intermediate Algebraic "
+ "functions must implement EvalFunc<Tuple>");
}
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
// combiner optimizer does not get invoked if the plan is being executed
// under illustrate, so POPartialAgg should not get used in that case
throw new UnsupportedOperationException();
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitPartialAgg(this);
}
private Result getResult(ExpressionOperator op) throws ExecException {
Result res;
switch (op.getResultType()) {
case DataType.BAG:
case DataType.BOOLEAN:
case DataType.BYTEARRAY:
case DataType.CHARARRAY:
case DataType.DOUBLE:
case DataType.FLOAT:
case DataType.INTEGER:
case DataType.LONG:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
case DataType.DATETIME:
case DataType.MAP:
case DataType.TUPLE:
res = op.getNext(op.getResultType());
break;
default:
String msg = "Invalid result type: "
+ DataType.findType(op.getResultType());
throw new ExecException(msg, 2270, PigException.BUG);
}
return res;
}
/**
* Runs the provided key-value pair through the aggregator plans.
* @param key
* @param value
* @return Result, containing a tuple of form (key, tupleReturnedByPlan1, tupleReturnedByPlan2, ...)
* @throws ExecException
*/
private Result getOutput(Object key, Tuple value) throws ExecException {
Tuple output = mTupleFactory.newTuple(valuePlans.size() + 1);
output.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
valuePlans.get(i).attachInput(value);
Result valRes = getResult(valueLeaves.get(i));
if (valRes.returnStatus == POStatus.STATUS_ERR) {
return valRes;
}
output.set(i + 1, valRes.result);
}
return new Result(POStatus.STATUS_OK, output);
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public String name() {
return getAliasString() + "Partial Agg" + "["
+ DataType.findTypeName(resultType) + "]" + mKey.toString();
}
public PhysicalPlan getKeyPlan() {
return keyPlan;
}
public void setKeyPlan(PhysicalPlan keyPlan) {
this.keyPlan = keyPlan;
keyLeaf = (ExpressionOperator) keyPlan.getLeaves().get(0);
}
public List<PhysicalPlan> getValuePlans() {
return valuePlans;
}
public void setValuePlans(List<PhysicalPlan> valuePlans) {
this.valuePlans = valuePlans;
valueLeaves = new ArrayList<ExpressionOperator>();
for (PhysicalPlan plan : valuePlans) {
valueLeaves.add((ExpressionOperator) plan.getLeaves().get(0));
}
}
@Override
public long spill() {
if (mapAggDisabled()) {
return 0;
} else {
if (doContingentSpill && !startedContingentSpill) {
LOG.info("Spill triggered by SpillableMemoryManager, but previous spill call is still not processed. Skipping");
return 0;
}
synchronized(spillLock) {
if (rawInputMap != null) {
LOG.info("Spill triggered. Memory usage: " + getMemorySize()
+ ". Raw map: keys = " + rawInputMap.size()
+ ", tuples = "+ numRecsInRawMap
+ ", Processed map: keys = " + processedInputMap.size()
+ ", tuples = "+ numRecsInProcessedMap );
}
startedContingentSpill = false;
doContingentSpill = true;
if (!sizeReductionChecked || !estimatedMemThresholds) {
numRecordsToSample = numRecsInRawMap;
}
try {
// Block till spilling is finished. If main thread execution has not come to POPartialAgg
// and is still processing lower pipeline for more than 5 seconds it means
// jvm is stuck doing GC and will soon fail with java.lang.OutOfMemoryError: GC overhead limit exceeded
// So exit out of here so that SpillableMemoryManger can at least spill
// other Spillable bags and free up some memory for user code to be able to run
// and reach POPartialAgg for the aggregation/spilling of the hashmaps to happen.
long startTime = System.currentTimeMillis();
while (doContingentSpill == true) {
Thread.sleep(25);
if (!startedContingentSpill && (System.currentTimeMillis() - startTime) >= 5000) {
break;
}
}
if (doContingentSpill) {
LOG.info("Not blocking for spill and letting SpillableMemoryManager"
+ " process other spillable objects as main thread has not reached here for 5 secs");
} else {
LOG.info("Finished spill for SpillableMemoryManager call");
return 1;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting for spill to finish", e);
}
return 0;
}
}
}
@Override
public long getMemorySize() {
return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
}
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
POPartialAgg clone = (POPartialAgg) super.clone();
clone.setKeyPlan(keyPlan.clone());
clone.setValuePlans(clonePlans(valuePlans));
return clone;
}
}