blob: 697d55c4e3ac9036d5c58e097d622ab5d2468fa9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.ArrayList;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A collection of Tuples. A DataBag may or may not fit into memory.
* DataBag extends spillable, which means that it registers with a memory
* manager. By default, it attempts to keep all of its contents in memory.
* If it is asked by the memory manager to spill to disk (by a call to
* spill()), it takes whatever it has in memory, opens a spill file, and
* writes the contents out. This may happen multiple times. The bag
* tracks all of the files it's spilled to.
*
* DataBag provides an Iterator interface, that allows callers to read
* through the contents. The iterators are aware of the data spilling.
* They have to be able to handle reading from files, as well as the fact
* that data they were reading from memory may have been spilled to disk
* underneath them.
*
* The DataBag interface assumes that all data is written before any is
* read. That is, a DataBag cannot be used as a queue. If data is written
* after data is read, the results are undefined. This condition is not
* checked on each add or read, for reasons of speed. Caveat emptor.
*
* Since spills are asynchronous (the memory manager requesting a spill
* runs in a separate thread), all operations dealing with the mContents
* Collection (which is the collection of tuples contained in the bag) have
* to be synchronized. This means that reading from a DataBag is currently
* serialized. This is ok for the moment because pig execution is
* currently single threaded. A ReadWriteLock was experimented with, but
* it was found to be about 10x slower than using the synchronize keyword.
* If pig changes its execution model to be multithreaded, we may need to
* return to this issue, as synchronizing reads will most likely defeat the
* purpose of multi-threading execution.
*
* DataBag come in several types, default, sorted, and distinct. The type
* must be chosen up front, there is no way to convert a bag on the fly.
*/
public abstract class DataBag extends Datum implements Spillable, Iterable<Tuple> {
private static final Log log = LogFactory.getLog(DataBag.class);
// Container that holds the tuples. Actual object instantiated by
// subclasses.
protected Collection<Tuple> mContents;
// Spill files we've created. These need to be removed in finalize.
protected ArrayList<File> mSpillFiles;
// Total size, including tuples on disk. Stored here so we don't have
// to run through the disk when people ask.
protected long mSize = 0;
protected boolean mMemSizeChanged = false;
protected long mMemSize = 0;
/**
* Get the number of elements in the bag, both in memory and on disk.
*/
public long size() {
return mSize;
}
/**
* Deprecated. Use size() instead.
*/
public int cardinality() {
return (int)size();
}
/**
* Find out if the bag is sorted.
*/
public abstract boolean isSorted();
/**
* Find out if the bag is distinct.
*/
public abstract boolean isDistinct();
/**
* Get an iterator to the bag. For default and distinct bags,
* no particular order is guaranteed. For sorted bags the order
* is guaranteed to be sorted according
* to the provided comparator.
*/
public abstract Iterator<Tuple> iterator();
/**
* Deprected. Use iterator() instead.
*/
@Deprecated
public Iterator<Tuple> content() {
return iterator();
}
/**
* Add a tuple to the bag.
* @param t tuple to add.
*/
public void add(Tuple t) {
synchronized (mContents) {
mMemSizeChanged = true;
mSize++;
mContents.add(t);
}
}
/** * Add contents of a bag to the bag.
* @param b bag to add contents of.
*/
public void addAll(DataBag b) {
synchronized (mContents) {
mMemSizeChanged = true;
mSize += b.size();
for (Tuple t : b) {
mContents.add(t);
}
}
}
// Do I need remove? I couldn't find it used anywhere.
/**
* Return the size of memory usage.
*/
@Override
public long getMemorySize() {
if (!mMemSizeChanged) return mMemSize;
long used = 0;
// I can't afford to talk through all the tuples every time the
// memory manager wants to know if it's time to dump. Just sample
// the first 100 and see what we get. This may not be 100%
// accurate, but it's just an estimate anyway.
int j;
int numInMem = 0;
synchronized (mContents) {
numInMem = mContents.size();
// Measure only what's in memory, not what's on disk.
Iterator<Tuple> i = mContents.iterator();
for (j = 0; i.hasNext() && j < 100; j++) {
used += i.next().getMemorySize();
used += REF_SIZE;
}
}
if (numInMem > 100) {
// Estimate the per tuple size. Do it in integer arithmetic
// (even though it will be slightly less accurate) for speed.
used /= j;
used *= numInMem;
}
mMemSize = used;
mMemSizeChanged = false;
return used;
}
/**
* Clear out the contents of the bag, both on disk and in memory.
* Any attempts to read after this is called will produce undefined
* results.
*/
public void clear() {
synchronized (mContents) {
mContents.clear();
if (mSpillFiles != null) {
for (int i = 0; i < mSpillFiles.size(); i++) {
mSpillFiles.get(i).delete();
}
mSpillFiles.clear();
}
mSize = 0;
}
}
/**
* This method is potentially very expensive since it may require a
* sort of the bag; don't call it unless you have to.
*/
public int compareTo(Object other) {
// Do we really need to be able to compare to DataAtom and Tuple?
// When does that happen?
if (this == other)
return 0;
if (other instanceof DataBag){
DataBag bOther = (DataBag) other;
if (this.size() != bOther.size()) {
if (this.size() > bOther.size()) return 1;
else return -1;
}
// Ugh, this is bogus. But I have to know if two bags have the
// same tuples, regardless of order. Hopefully most of the
// time the size check above will prevent this.
// If either bag isn't already sorted, create a sorted bag out
// of it so I can guarantee order.
DataBag thisClone;
DataBag otherClone;
if (this instanceof SortedDataBag ||
this instanceof DistinctDataBag) {
thisClone = this;
} else {
thisClone = new SortedDataBag(null);
Iterator<Tuple> i = iterator();
while (i.hasNext()) thisClone.add(i.next());
}
if (other instanceof SortedDataBag ||
this instanceof DistinctDataBag) {
otherClone = bOther;
} else {
otherClone = new SortedDataBag(null);
Iterator<Tuple> i = bOther.iterator();
while (i.hasNext()) otherClone.add(i.next());
}
Iterator<Tuple> thisIt = thisClone.iterator();
Iterator<Tuple> otherIt = otherClone.iterator();
while (thisIt.hasNext() && otherIt.hasNext()) {
Tuple thisT = thisIt.next();
Tuple otherT = otherIt.next();
int c = thisT.compareTo(otherT);
if (c != 0) return c;
}
return 0; // if we got this far, they must be equal
} else if (other instanceof DataAtom) {
return +1;
} else if (other instanceof Tuple) {
return -1;
} else {
return -1;
}
}
@Override
public boolean equals(Object other) {
return compareTo(other) == 0;
}
/**
* Write a bag's contents to disk.
* @param out DataOutput to write data to.
* @throws IOException (passes it on from underlying calls).
*/
@Override
public void write(DataOutput out) throws IOException {
// We don't care whether this bag was sorted or distinct because
// using the iterator to write it will guarantee those things come
// correctly. And on the other end there'll be no reason to waste
// time re-sorting or re-applying distinct.
out.write(BAG);
out.writeLong(size());
for (Tuple t : this) {
t.write(out);
}
}
/**
* Read a bag from disk.
* @param in DataInput to read data from.
* @throws IOException (passes it on from underlying calls).
*/
static DataBag read(DataInput in) throws IOException {
long size = in.readLong();
// Always use a default data bag, as if it was sorted or distinct
// we're guaranteed it was written out that way already, and we
// don't need to mess with it.
DataBag ret = BagFactory.getInstance().newDefaultBag();
for (long i = 0; i < size; i++) {
Tuple t = new Tuple();
t.readFields(in);
ret.add(t);
}
return ret;
}
/**
* This is used by FuncEvalSpec.FakeDataBag.
* @param stale Set stale state.
*/
public void markStale(boolean stale)
{
}
/**
* Write the bag into a string. */
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('{');
Iterator<Tuple> it = iterator();
while ( it.hasNext() ) {
Tuple t = it.next();
String s = t.toString();
sb.append(s);
if (it.hasNext()) sb.append(", ");
}
sb.append('}');
return sb.toString();
}
@Override
public int hashCode() {
int hash = 1;
Iterator<Tuple> i = iterator();
while (i.hasNext()) {
// Use 37 because we want a prime, and tuple uses 31.
hash = 37 * hash + i.next().hashCode();
}
return hash;
}
/**
* Need to override finalize to clean out the mSpillFiles array.
*/
@Override
protected void finalize() {
if (mSpillFiles != null) {
for (int i = 0; i < mSpillFiles.size(); i++) {
mSpillFiles.get(i).delete();
}
}
}
/**
* Get a file to spill contents to. The file will be registered in the
* mSpillFiles array.
* @return stream to write tuples to.
*/
protected DataOutputStream getSpillFile() throws IOException {
if (mSpillFiles == null) {
// We want to keep the list as small as possible.
mSpillFiles = new ArrayList<File>(1);
}
String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ;
File tmpDir = new File(tmpDirName);
// if the directory does not exist, create it.
if (!tmpDir.exists()){
log.info("Temporary directory doesn't exists. Trying to create: " + tmpDir.getAbsolutePath());
// Create the directory and see if it was successful
if (tmpDir.mkdir()){
log.info("Successfully created temporary directory: " + tmpDir.getAbsolutePath());
} else {
// If execution reaches here, it means that we needed to create the directory but
// were not successful in doing so.
//
// If this directory is created recently then we can simply
// skip creation. This is to address a rare issue occuring in a cluster despite the
// the fact that spill() makes call to getSpillFile() in a synchronized
// block.
if (tmpDir.exists()) {
log.info("Temporary directory already exists: " + tmpDir.getAbsolutePath());
} else {
log.error("Unable to create temporary directory: " + tmpDir.getAbsolutePath());
throw new IOException("Unable to create temporary directory: " + tmpDir.getAbsolutePath() );
}
}
}
File f = File.createTempFile("pigbag", null);
f.deleteOnExit();
mSpillFiles.add(f);
return new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(f)));
}
/**
* Report progress to HDFS.
*/
protected void reportProgress() {
if (PigMapReduce.reporter != null) {
PigMapReduce.reporter.progress();
}
}
public static abstract class BagDelimiterTuple extends Tuple{}
public static class StartBag extends BagDelimiterTuple{}
public static class EndBag extends BagDelimiterTuple{}
public static final Tuple startBag = new StartBag();
public static final Tuple endBag = new EndBag();
protected static final int MAX_SPILL_FILES = 100;
}