blob: 08cbcc925b4b113495c2fca8ee7105cd359c1855 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.impl.util.BagFormat;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* Default implementation of DataBag. This is the an abstract class used as a
* parent for all three of the types of data bags.
*/
@SuppressWarnings("serial")
public abstract class DefaultAbstractBag implements DataBag {
private static final Log log = LogFactory.getLog(DataBag.class);
// If we grow past 100K, may be worthwhile to register.
private static final int SPILL_REGISTER_THRESHOLD = 100 * 1024;
private static PigLogger pigLogger;
private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
// Container that holds the tuples. Actual object instantiated by
// subclasses.
protected Collection<Tuple> mContents;
// Spill files we've created. These need to be removed in finalize.
protected FileList mSpillFiles;
// Total size, including tuples on disk. Stored here so we don't have
// to run through the disk when people ask.
protected long mSize = 0;
// Number of tuples to sample per bag, to get an estimate of tuple size
private static final int SPILL_SAMPLE_SIZE = 100;
private static final int SPILL_SAMPLE_FREQUENCY = 10;
long aggSampleTupleSize = 0;
int sampled = 0;
private boolean spillableRegistered = false;
/**
* Get the number of elements in the bag, both in memory and on disk.
*/
@Override
public long size() {
return mSize;
}
/**
* Sample every SPILL_SAMPLE_FREQUENCYth tuple
* until we reach a max of SPILL_SAMPLE_SIZE
* to get an estimate of the tuple sizes.
*/
protected void sampleContents() {
synchronized (mContents) {
Iterator<Tuple> iter = mContents.iterator();
for (int i = 0; i < sampled * SPILL_SAMPLE_FREQUENCY && iter.hasNext(); i++) {
iter.next();
}
for (int i = sampled; iter.hasNext() && sampled < SPILL_SAMPLE_SIZE; i++) {
Tuple t = iter.next();
if (t != null && i % SPILL_SAMPLE_FREQUENCY == 0) {
aggSampleTupleSize += t.getMemorySize();
sampled += 1;
}
}
}
}
/**
* Add a tuple to the bag.
* @param t tuple to add.
*/
@Override
public void add(Tuple t) {
synchronized (mContents) {
mSize++;
mContents.add(t);
}
markSpillableIfNecessary();
}
/**
* All bag implementations that can get big enough to be spilled
* should call this method after every time they add an element.
*/
protected void markSpillableIfNecessary() {
if (!spillableRegistered) {
long estimate = getMemorySize();
if ( estimate >= SPILL_REGISTER_THRESHOLD) {
SpillableMemoryManager.getInstance().registerSpillable(this);
spillableRegistered = true;
}
}
}
@Override
public void addAll(DataBag b) {
addAll((Iterable<Tuple>) b);
}
public void addAll(Collection<Tuple> c) {
addAll((Iterable<Tuple>) c);
}
/**
* Add contents of an iterable (a collection or a DataBag)
*
* @param iterable a Collection or DataBag to add contents of
*/
public void addAll(Iterable<Tuple> iterable) {
synchronized (mContents) {
for (Tuple t : iterable) {
add(t);
}
}
}
/**
* Return the size of memory usage.
*/
@Override
public long getMemorySize() {
int numInMem = 0;
synchronized (mContents) {
numInMem = mContents.size();
// If we've already gotten the estimate
// and the number of tuples hasn't changed, or was above
// the sample size and is still above the sample size, we can
// produce a new estimate without sampling the tuples again.
if (sampled != 0 && (sampled == numInMem ||
sampled > SPILL_SAMPLE_SIZE && numInMem > SPILL_SAMPLE_SIZE)) {
return totalSizeFromAvgTupleSize(aggSampleTupleSize/sampled, numInMem);
}
sampleContents();
int avgTupleSize;
if (sampled != 0) {
avgTupleSize = (int) (aggSampleTupleSize / sampled);
} else {
avgTupleSize = 0;
}
return totalSizeFromAvgTupleSize(avgTupleSize, numInMem);
}
}
private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) {
long used = avgTupleSize * numInMem;
long mFields_size = roundToEight(4 + numInMem*4); /* mContents fixed + per entry */
// in java hotspot 32bit vm, there seems to be a minimum bag size of 188 bytes
// some of the extra bytes is probably from a minimum size of this array list
mFields_size = Math.max(40, mFields_size);
// the fixed overhead for this object and other object variables = 84 bytes
// 8 - object header
// 4 + 8 + 8 - sampled + aggSampleTupleSize + mSize
// 8 + 8 - mContents ref + mSpillFiles ref
// 4 - spillableRegistered +4 instead of 1 to round it to eight
// 36 - mContents fixed
used += 84 + mFields_size;
// add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList,
// object variable inside ArrayList and references to spill files
if (mSpillFiles != null) {
used += roundToEight(36 /* mSpillFiles fixed overhead*/ + mSpillFiles.size()*4);
if(mSpillFiles.size() > 0){
//a rough estimate of memory used by each file entry
// the auto generated files are likely to have same length
long approx_per_entry_size =
roundToEight(mSpillFiles.get(0).toString().length() * 2 + 38);
used += mSpillFiles.size() * approx_per_entry_size;
}
}
return used;
}
/**
* Memory size of objects are rounded to multiple of 8 bytes
* @param i
* @return i rounded to a equal of higher multiple of 8
*/
private long roundToEight(long i) {
return 8 * ((i+7)/8); // integer division rounds the result down
}
/**
* Clear out the contents of the bag, both on disk and in memory.
* Any attempts to read after this is called will produce undefined
* results.
*/
@Override
public void clear() {
synchronized (mContents) {
mContents.clear();
if (mSpillFiles != null) {
for (int i = 0; i < mSpillFiles.size(); i++) {
boolean res = mSpillFiles.get(i).delete();
if (!res)
warn ("DefaultAbstractBag.clear: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
}
mSpillFiles.clear();
}
mSize = 0;
aggSampleTupleSize = 0;
sampled = 0;
// not changing spillableRegistered -- clear doesn't change that.
}
}
/**
* This method is potentially very expensive since it may require a
* sort of the bag; don't call it unless you have to.
*/
@Override
@SuppressWarnings("unchecked")
public int compareTo(Object other) {
if (this == other)
return 0;
if (other instanceof DataBag) {
DataBag bOther = (DataBag) other;
if (this.size() != bOther.size()) {
if (this.size() > bOther.size()) return 1;
else return -1;
}
// Ugh, this is bogus. But I have to know if two bags have the
// same tuples, regardless of order. Hopefully most of the
// time the size check above will prevent this.
// If either bag isn't already sorted, create a sorted bag out
// of it so I can guarantee order.
DataBag thisClone;
DataBag otherClone;
BagFactory factory = BagFactory.getInstance();
if (this.isSorted() || this.isDistinct()) {
thisClone = this;
} else {
thisClone = factory.newSortedBag(null);
Iterator<Tuple> i = iterator();
while (i.hasNext()) thisClone.add(i.next());
}
if (((DataBag) other).isSorted() || ((DataBag)other).isDistinct()) {
otherClone = bOther;
} else {
otherClone = factory.newSortedBag(null);
Iterator<Tuple> i = bOther.iterator();
while (i.hasNext()) otherClone.add(i.next());
}
Iterator<Tuple> thisIt = thisClone.iterator();
Iterator<Tuple> otherIt = otherClone.iterator();
while (thisIt.hasNext() && otherIt.hasNext()) {
Tuple thisT = thisIt.next();
Tuple otherT = otherIt.next();
int c = thisT.compareTo(otherT);
if (c != 0) return c;
}
return 0; // if we got this far, they must be equal
} else {
return DataType.compare(this, other);
}
}
@Override
public boolean equals(Object other) {
if( other == null ) {
return false;
}
return compareTo(other) == 0;
}
/**
* Write a bag's contents to disk.
* @param out DataOutput to write data to.
* @throws IOException (passes it on from underlying calls).
*/
@Override
public void write(DataOutput out) throws IOException {
sedes.writeDatum(out, this);
}
/**
* Read a bag from disk.
* @param in DataInput to read data from.
* @throws IOException (passes it on from underlying calls).
*/
@Override
public void readFields(DataInput in) throws IOException {
long size = in.readLong();
for (long i = 0; i < size; i++) {
try {
Object o = sedes.readDatum(in);
add((Tuple)o);
} catch (ExecException ee) {
throw ee;
}
}
}
/**
* This is used by FuncEvalSpec.FakeDataBag.
* @param stale Set stale state.
*/
@Override
public void markStale(boolean stale)
{
}
/**
* Write the bag into a string. */
@Override
public String toString() {
return BagFormat.format(this);
}
@Override
public int hashCode() {
int hash = 0;
Iterator<Tuple> i = iterator();
while (i.hasNext()) {
hash += i.next().hashCode();
}
return hash;
}
/**
* Get a file to spill contents to. The file will be registered in the
* mSpillFiles array.
* @return stream to write tuples to.
*/
protected DataOutputStream getSpillFile() throws IOException {
if (mSpillFiles == null) {
// We want to keep the list as small as possible.
mSpillFiles = new FileList(1);
}
String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ;
File tmpDir = new File(tmpDirName);
// if the directory does not exist, create it.
if (!tmpDir.exists()){
log.info("Temporary directory doesn't exists. Trying to create: " + tmpDir.getAbsolutePath());
// Create the directory and see if it was successful
if (tmpDir.mkdir()){
log.info("Successfully created temporary directory: " + tmpDir.getAbsolutePath());
} else {
// If execution reaches here, it means that we needed to create the directory but
// were not successful in doing so.
//
// If this directory is created recently then we can simply
// skip creation. This is to address a rare issue occuring in a cluster despite the
// the fact that spill() makes call to getSpillFile() in a synchronized
// block.
if (tmpDir.exists()) {
log.info("Temporary directory already exists: " + tmpDir.getAbsolutePath());
} else {
int errCode = 2111;
String msg = "Unable to create temporary directory: " + tmpDir.getAbsolutePath();
throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
File f = File.createTempFile("pigbag", null);
f.deleteOnExit();
mSpillFiles.add(f);
return new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(f)));
}
/**
* Report progress to HDFS.
*/
protected void reportProgress() {
if (PhysicalOperator.getReporter() != null) {
PhysicalOperator.getReporter().progress();
}
}
@SuppressWarnings("rawtypes")
protected void warn(String msg, Enum warningEnum, Throwable e) {
pigLogger = PhysicalOperator.getPigLogger();
if(pigLogger != null) {
pigLogger.warn(this, msg, warningEnum);
} else {
log.warn(msg, e);
}
}
@SuppressWarnings("rawtypes")
protected void incSpillCount(Enum counter) {
incSpillCount(counter, 1);
}
@SuppressWarnings("rawtypes")
protected void incSpillCount(Enum counter, long numRecsSpilled) {
PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null && reporter.getCounter(counter)!=null) {
reporter.getCounter(counter).increment(numRecsSpilled);
} else {
PigHadoopLogger.getInstance().warn(mContents, "Spill counter incremented", counter);
}
}
public static abstract class BagDelimiterTuple extends DefaultTuple{}
public static class StartBag extends BagDelimiterTuple{
private static final long serialVersionUID = 1L;}
public static class EndBag extends BagDelimiterTuple{
private static final long serialVersionUID = 1L;}
public static final Tuple startBag = new StartBag();
public static final Tuple endBag = new EndBag();
protected static final int MAX_SPILL_FILES = 100;
}