blob: ba04e7a6ed80a00723f5fb7d3887c6a067ca0a3f [file] [log] [blame]
package datafu.pig.util;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
/**
* Captures the schema of the input tuple on the front-end, stores it into the UDFContext, and provides
* an easy means of referencing it on the back-end to aid in writing schema-based UDFs.
*
* Example: This example computes the monthly payments for mortgages depending on interest rate.
* <code>
* public class MortgagePayment extends AliasableEvalFunc<DataBag> {
* ...
* public DataBag exec(Tuple input) throws IOException {
* DataBag output = BagFactory.getInstance().newDefaultBag();
*
* Double principal = getDouble(input, "principal"); // get a value from the input tuple by alias
* Integer numPayments = getInteger(input, "num_payments");
* DataBag interestRates = getBag(input, "interest_rates");
*
* for (Tuple interestTuple : interestRates) {
* Double interest = getDouble(interestTuple, getPrefixedAliasName("interest_rates", "interest_rate")); // get a value from the inner bag tuple by alias
* double monthlyPayment = computeMonthlyPayment(principal, numPayments, interest);
* output.add(TupleFactory.getInstance().newTuple(monthlyPayment));
* }
* return output;
* }
* </code>
*
* <h3>Comparison versus {@link SimpleEvalFunc}</h3>
* <p>
* SimpleEvalFunc and AliasableEvalFunc are actually fairly different. The primary purpose of simple is
* to skip the boilerplate under the assumption that the arguments in and out are well... simple.
* It also assumes that these arguments are in a well-defined positional ordering.
* <p>
* Aliasable allows the UDF writer to avoid dealing with all positional assumptions and instead reference fields
* by their aliases. This practice allows for more readable code since the alias names should have more meaning
* to the reader than the position. This approach is also less error prone since it creates a more explicit contract
* of what input the UDF expects and prevents simple mistakes that positional-based UDFs could not easily catch,
* such as transposing two fields of the same type. If this contract is violated, say, by attempting to reference
* a field that is not present, a meaningful error message may be thrown.
*
* @author wvaughan
*
* @param <T>
*/
public abstract class AliasableEvalFunc<T> extends EvalFunc<T>
{
private static final String ALIAS_MAP_PROPERTY = "aliasMap";
private String instanceName;
private Map<String, Integer> aliasToPosition = null;
public AliasableEvalFunc() {
}
/**
* A wrapper method which captures the schema and then calls getOutputSchema
*/
@Override
public Schema outputSchema(Schema input) {
storeFieldAliases(input);
return getOutputSchema(input);
}
@Override
public void setUDFContextSignature(String signature) {
setInstanceName(signature);
}
/**
* Specify the output schema as in {link EvalFunc#outputSchema(Schema)}.
*
* @param input
* @return outputSchema
*/
public abstract Schema getOutputSchema(Schema input);
/**
* Helper method to return the context properties for this class
*
* @return
*/
protected Properties getContextProperties() {
UDFContext context = UDFContext.getUDFContext();
Properties properties = context.getUDFProperties(this.getClass());
return properties;
}
/**
* Helper method to return the context properties for this instance of this class
*
* @return
*/
protected Properties getInstanceProperties() {
Properties contextProperties = getContextProperties();
if (!contextProperties.containsKey(getInstanceName())) {
contextProperties.put(getInstanceName(), new Properties());
}
return (Properties)contextProperties.get(getInstanceName());
}
@SuppressWarnings("unchecked")
private Map<String, Integer> getAliasMap() {
return (Map<String, Integer>)getInstanceProperties().get(ALIAS_MAP_PROPERTY);
}
private void setAliasMap(Map<String, Integer> aliases) {
getInstanceProperties().put(ALIAS_MAP_PROPERTY, aliases);
}
private void storeFieldAliases(Schema tupleSchema)
{
Map<String, Integer> aliases = new HashMap<String, Integer>();
constructFieldAliases(aliases, tupleSchema, null);
log.info("In instance: "+getInstanceName()+", stored alias map: " + aliases);
// pass the input schema into the exec function
setAliasMap(aliases);
}
private void constructFieldAliases(Map<String, Integer> aliases, Schema tupleSchema, String prefix)
{
int position = 0;
for (Schema.FieldSchema field : tupleSchema.getFields()) {
String alias = getPrefixedAliasName(prefix, field.alias);
if (field.alias != null && !field.alias.equals("null")) {
aliases.put(alias, position);
log.trace("In instance: "+getInstanceName()+", stored alias " + alias + " as position " + position);
}
if (field.schema != null) {
constructFieldAliases(aliases, field.schema, alias);
}
position++;
}
}
protected String getPrefixedAliasName(String prefix, String alias)
{
if (alias == null || alias.equals("null")) {
if (prefix == null) return "";
else return prefix; // ignore the null inner bags/tuples
}
else return ((prefix == null || prefix.equals("null") || prefix.trim().equals("")) ? "" : prefix+".") + alias; // handle top bag
}
/**
* Field aliases are generated from the input schema<br/>
* Each alias maps to a bag position<br/>
* Inner bags/tuples will have alias of outer.inner.foo
*
* @return A map of field alias to field position
*/
protected Map<String, Integer> getFieldAliases()
{
Map<String, Integer> aliases = getAliasMap();
if (aliases == null) {
log.error("Class: " + this.getClass());
log.error("Instance name: " + this.getInstanceName());
log.error("Properties: " + getContextProperties());
throw new RuntimeException("Could not retrieve aliases from properties using " + ALIAS_MAP_PROPERTY);
}
return aliases;
}
protected Integer getPosition(String alias) {
if (aliasToPosition == null) {
aliasToPosition = getFieldAliases();
}
return aliasToPosition.get(alias);
}
protected Integer getPosition(String prefix, String alias) {
return getPosition(getPrefixedAliasName(prefix, alias));
}
protected Integer getInteger(Tuple tuple, String alias) throws ExecException {
return getInteger(tuple, alias, null);
}
protected Integer getInteger(Tuple tuple, String alias, Integer defaultValue) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
Number number = (Number)tuple.get(i);
if (number == null) return defaultValue;
return number.intValue();
}
protected Long getLong(Tuple tuple, String alias) throws ExecException {
return getLong(tuple, alias, null);
}
protected Long getLong(Tuple tuple, String alias, Long defaultValue) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
Number number = (Number)tuple.get(i);
if (number == null) return defaultValue;
return number.longValue();
}
protected Float getFloat(Tuple tuple, String alias) throws ExecException {
return getFloat(tuple, alias, null);
}
protected Float getFloat(Tuple tuple, String alias, Float defaultValue) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
Number number = (Number)tuple.get(i);
if (number == null) return defaultValue;
return number.floatValue();
}
protected Double getDouble(Tuple tuple, String alias) throws ExecException {
return getDouble(tuple, alias, null);
}
protected Double getDouble(Tuple tuple, String alias, Double defaultValue) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
Number number = (Number)tuple.get(i);
if (number == null) return defaultValue;
return number.doubleValue();
}
protected String getString(Tuple tuple, String alias) throws ExecException {
return getString(tuple, alias, null);
}
protected String getString(Tuple tuple, String alias, String defaultValue) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
String s = (String)tuple.get(i);
if (s == null) return defaultValue;
return s;
}
protected Boolean getBoolean(Tuple tuple, String alias) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
return (Boolean)tuple.get(i);
}
protected DataBag getBag(Tuple tuple, String alias) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
return (DataBag)tuple.get(i);
}
protected Object getObject(Tuple tuple, String alias) throws ExecException {
Integer i = getPosition(alias);
if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
return tuple.get(i);
}
/**
*
* @return the name of this instance corresponding to the UDF Context Signature
* @see #setUDFContextSignature(String)
*/
protected String getInstanceName() {
if (instanceName == null) {
throw new RuntimeException("Instance name is null. This should not happen unless UDFContextSignature was not set.");
}
return instanceName;
}
private void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}