Merge branch '1.4' into 1.5
diff --git a/accumulo-pig b/accumulo-pig
new file mode 100755
index 0000000..7bf96e6
--- /dev/null
+++ b/accumulo-pig
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+export PIG_CLASSPATH=./target/accumulo-pig-1.4.4-SNAPSHOT.jar:$ACCUMULO_HOME/lib/accumulo-core-1.4.5-SNAPSHOT.jar:$ACCUMULO_HOME/lib/libthrift-0.6.1.jar:$ACCUMULO_HOME/lib/cloudtrace-1.4.5-SNAPSHOT.jar
+
+pig -v -x mapreduce 
+
+# vim: ft=sh
diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
new file mode 100644
index 0000000..30c39c9
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo
+ * 
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
+ * 
+ */
+public class TypedAccumuloStorage extends AbstractAccumuloStorage {
+  private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class);
+  protected LoadStoreCaster caster;
+  protected String contextSignature = null;
+  
+  private ResourceSchema schema_;
+  private RequiredFieldList requiredFieldList;
+  
+  public TypedAccumuloStorage() {
+    this.caster = new Utf8StorageConverter();
+  }
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+    tuple.set(4, new Long(key.getTimestamp()));
+    tuple.set(5, new DataByteArray(value.get()));
+    return tuple;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
+    
+    Text t = tupleToText(tuple, 0, fieldSchemas);
+    
+    Mutation mut = new Mutation(t);
+    Text cf = tupleToText(tuple, 1, fieldSchemas);
+    Text cq = tupleToText(tuple, 2, fieldSchemas);
+    
+    if (tuple.size() > 4) {
+      Text cv = tupleToText(tuple, 3, fieldSchemas);
+      
+      byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
+      
+      Value val = new Value(valueBytes);
+      if (cv.getLength() == 0) {
+        mut.put(cf, cq, val);
+      } else {
+        mut.put(cf, cq, new ColumnVisibility(cv), val);
+      }
+    } else {
+      byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
+      Value val = new Value(valueBytes);
+      mut.put(cf, cq, val);
+    }
+    
+    return Collections.singleton(mut);
+  }
+  
+  @Override
+  public void setUDFContextSignature(String signature) {
+    this.contextSignature = signature;
+  }
+  
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature) {
+    this.contextSignature = signature;
+  }
+  
+  /**
+   * Returns UDFProperties based on <code>contextSignature</code>.
+   */
+  private Properties getUDFProperties() {
+    return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
+  }
+  
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+    if (!(caster instanceof LoadStoreCaster)) {
+      LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
+      throw new IOException("Bad Caster " + caster.getClass());
+    }
+    schema_ = s;
+    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
+  }
+  
+  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToText(o, type);
+  }
+  
+  private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+  }
+  
+  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToBytes(o, type);
+    
+  }
+  
+  private Text objToText(Object o, byte type) throws IOException {
+    return new Text(objToBytes(o, type));
+  }
+  
+  @SuppressWarnings("unchecked")
+  private byte[] objToBytes(Object o, byte type) throws IOException {
+    if (o == null)
+      return null;
+    switch (type) {
+      case DataType.BYTEARRAY:
+        return ((DataByteArray) o).get();
+      case DataType.BAG:
+        return caster.toBytes((DataBag) o);
+      case DataType.CHARARRAY:
+        return caster.toBytes((String) o);
+      case DataType.DOUBLE:
+        return caster.toBytes((Double) o);
+      case DataType.FLOAT:
+        return caster.toBytes((Float) o);
+      case DataType.INTEGER:
+        return caster.toBytes((Integer) o);
+      case DataType.LONG:
+        return caster.toBytes((Long) o);
+      case DataType.BIGINTEGER:
+        return caster.toBytes((BigInteger) o);
+      case DataType.BIGDECIMAL:
+        return caster.toBytes((BigDecimal) o);
+      case DataType.BOOLEAN:
+        return caster.toBytes((Boolean) o);
+      case DataType.DATETIME:
+        return caster.toBytes((DateTime) o);
+        
+        // The type conversion here is unchecked.
+        // Relying on DataType.findType to do the right thing.
+      case DataType.MAP:
+        return caster.toBytes((Map<String,Object>) o);
+        
+      case DataType.NULL:
+        return null;
+      case DataType.TUPLE:
+        return caster.toBytes((Tuple) o);
+      case DataType.ERROR:
+        throw new IOException("Unable to determine type of " + o.getClass());
+      default:
+        throw new IOException("Unable to find a converter for tuple field " + o);
+    }
+  }
+  
+}