Merge branch '1.4' into 1.5
diff --git a/.gitignore b/.gitignore
index f0e4507..1121465 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,6 @@
+*~
 .classpath
 .project
 .settings
+/lib
 /target
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7f58156
--- /dev/null
+++ b/README.md
@@ -0,0 +1,90 @@
+build the JAR (Note, you will need to download the accumulo src, build it, and install it into your maven repo before this will work)
+
+    mvn package
+
+download the JARs needed by pig
+
+    mvn dependency:copy-dependencies -DoutputDirectory=lib  -DincludeArtifactIds=zookeeper,libthrift,accumulo-core,cloudtrace
+
+print the register statements we will need in pig
+
+    for JAR in lib/*.jar target/accumulo-pig-1.4.0.jar ; 
+    do 
+        echo register `pwd`/$JAR; 
+    done
+
+Example output
+
+    register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.4.0.jar
+    register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.4.0.jar
+    register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
+    register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
+    register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.4.0.jar
+
+Run Pig, copy the register statements above and paste them into the pig terminal.  Then you can LOAD from and STORE into accumulo.
+
+    $ pig
+    2012-03-02 08:15:25,808 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/developer/workspace/accumulo-pig/pig_1330694125807.log
+    2012-03-02 08:15:25,937 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://127.0.0.1/
+    2012-03-02 08:15:26,032 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 127.0.0.1:9001
+    grunt> register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.4.0.jar
+    grunt> register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.4.0.jar
+    grunt> register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
+    grunt> register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
+    grunt> register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.4.0.jar
+    grunt> 
+    grunt> DATA = LOAD 'accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt' 
+    >>    using org.apache.accumulo.pig.AccumuloStorage() AS (row, cf, cq, cv, ts, val);
+    grunt> 
+    grunt> DATA2 = FOREACH DATA GENERATE row, cf, cq, cv, val;
+    grunt> 
+    grunt> STORE DATA2 into 'accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181' using org.apache.accumulo.pig.AccumuloStorage();
+    2012-03-02 08:18:44,090 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
+    2012-03-02 08:18:44,093 [main] INFO  org.apache.pig.newplan.logical.rules.ColumnPruneVisitor - Columns pruned for DATA: $4
+    2012-03-02 08:18:44,108 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
+    2012-03-02 08:18:44,110 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
+    2012-03-02 08:18:44,110 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
+    2012-03-02 08:18:44,117 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
+    2012-03-02 08:18:44,118 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
+    2012-03-02 08:18:44,120 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job7611629033341757288.jar
+    2012-03-02 08:18:46,282 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job7611629033341757288.jar created
+    2012-03-02 08:18:46,286 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
+    2012-03-02 08:18:46,375 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
+    2012-03-02 08:18:46,876 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
+    2012-03-02 08:18:46,878 [Thread-17] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
+    2012-03-02 08:18:47,887 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201203020643_0001
+    2012-03-02 08:18:47,887 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://127.0.0.1:50030/jobdetails.jsp?jobid=job_201203020643_0001
+    2012-03-02 08:18:54,434 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
+    2012-03-02 08:18:57,484 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
+    2012-03-02 08:18:57,485 [main] INFO  org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics: 
+
+    HadoopVersion    PigVersion    UserId    StartedAt    FinishedAt    Features
+    0.20.2    0.9.2    developer    2012-03-02 08:18:44    2012-03-02 08:18:57    UNKNOWN
+
+    Success!
+
+    Job Stats (time in seconds):
+    JobId    Maps    Reduces    MaxMapTime    MinMapTIme    AvgMapTime    MaxReduceTime    MinReduceTime    AvgReduceTime    Alias    Feature    Outputs
+    job_201203020643_0001    1    0    3    3    3    0    0    0    DATA,DATA2    MAP_ONLY    accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181,
+
+    Input(s):
+    Successfully read 288 records from: "accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt"
+
+    Output(s):
+    Successfully stored 288 records in: "accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181"
+
+    Counters:
+    Total records written : 288
+    Total bytes written : 0
+    Spillable Memory Manager spill count : 0
+    Total bags proactively spilled: 0
+    Total records proactively spilled: 0
+
+    Job DAG:
+    job_201203020643_0001
+
+
+    2012-03-02 08:18:57,492 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
+    grunt> 
+
+
diff --git a/pom.xml b/pom.xml
index 249dcce..37f2841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.accumulo</groupId>
   <artifactId>accumulo-pig</artifactId>
-  <version>1.4.4-SNAPSHOT</version>
+  <version>1.5.0-SNAPSHOT</version>
   
   <dependencies>
     <dependency>
@@ -32,20 +32,77 @@
       <version>1.0.4</version>
     </dependency>
     <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.16</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
       <version>1.2.1</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-core</artifactId>
-      <version>1.4.4</version>
-    </dependency>
+    <!-- Needed by Apache Pig -->
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
       <version>1.6</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+      <version>1.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
+    </dependency>
   </dependencies>
-  
+
+  <profiles>
+    <profile>
+      <id>assemble</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <source>1.6</source>
+              <target>1.6</target>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <configuration>
+              <descriptorRefs>
+                <descriptorRef>jar-with-dependencies</descriptorRef>
+              </descriptorRefs>
+              <archive>
+                <manifest>
+                  <mainClass />
+                </manifest>
+              </archive>
+            </configuration>
+            <executions>
+              <execution>
+                <id>make-assembly</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
 </project>
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index d26cf40..cf152a7 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,9 +21,14 @@
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -185,26 +190,30 @@
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
-      AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-      AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+    if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+      AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+      
+      try {
+        AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+      } catch (AccumuloSecurityException e) {
+        throw new IOException(e);
       }
       
-      AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+      if (columnFamilyColumnQualifierPairs.size() > 0) {
+        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
+      }
+      
+      AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
       configureInputFormat(conf);
     }
   }
   
-  protected void configureInputFormat(Configuration conf) {
-    
-  }
+  protected void configureInputFormat(Configuration conf) {}
   
-  protected void configureOutputFormat(Configuration conf) {
-    
-  }
+  protected void configureOutputFormat(Configuration conf) {}
   
   @Override
   public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
@@ -212,14 +221,10 @@
   }
   
   @Override
-  public void setUDFContextSignature(String signature) {
-    
-  }
+  public void setUDFContextSignature(String signature) {}
   
   /* StoreFunc methods */
-  public void setStoreFuncUDFContextSignature(String signature) {
-    
-  }
+  public void setStoreFuncUDFContextSignature(String signature) {}
   
   public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
     return relativeToAbsolutePath(location, curDir);
@@ -229,13 +234,28 @@
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
-      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-      configureOutputFormat(conf);
+    try {
+      if (!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf)) {
+        BatchWriterConfig bwConfig = new BatchWriterConfig();
+        bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+        bwConfig.setMaxMemory(maxMutationBufferSize);
+        bwConfig.setMaxWriteThreads(maxWriteThreads);
+        AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+        
+        AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+        AccumuloOutputFormat.setDefaultTableName(job, table);
+        AccumuloOutputFormat.setCreateTables(job, true);
+        
+        try {
+          AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+        } catch (AccumuloSecurityException e) {
+          throw new IOException(e);
+        }
+        
+        configureOutputFormat(conf);
+      }
+    } catch (java.lang.IllegalStateException e1) {
+      e1.printStackTrace();
     }
   }
   
@@ -267,6 +287,7 @@
   }
   
   public void cleanupOnFailure(String failure, Job job) {}
-
-  public void cleanupOnSuccess(String location, Job job) {}
+  
+  @Override
+  public void cleanupOnSuccess(String location, Job job) throws IOException {}
 }
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 15b1c47..4de1618 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -61,23 +61,29 @@
   
   @Override
   public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-    Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-    Text cf = Utils.objToText(tuple.get(1));
-    Text cq = Utils.objToText(tuple.get(2));
     
-    if (tuple.size() > 4) {
-      Text cv = Utils.objToText(tuple.get(3));
-      Value val = new Value(Utils.objToBytes(tuple.get(4)));
-      if (cv.getLength() == 0) {
-        mut.put(cf, cq, val);
+    try {
+      Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+      Text cf = Utils.objToText(tuple.get(1));
+      Text cq = Utils.objToText(tuple.get(2));
+      
+      if (tuple.size() > 4) {
+        Text cv = Utils.objToText(tuple.get(3));
+        Value val = new Value(Utils.objToBytes(tuple.get(4)));
+        if (cv.getLength() == 0) {
+          mut.put(cf, cq, val);
+        } else {
+          mut.put(cf, cq, new ColumnVisibility(cv), val);
+        }
       } else {
-        mut.put(cf, cq, new ColumnVisibility(cv), val);
+        Value val = new Value(Utils.objToBytes(tuple.get(3)));
+        mut.put(cf, cq, val);
       }
-    } else {
-      Value val = new Value(Utils.objToBytes(tuple.get(3)));
-      mut.put(cf, cq, val);
+      
+      return Collections.singleton(mut);
+    } catch (IOException e) {
+      System.err.println("Error on Tuple: " + tuple);
+      throw e;
     }
-    
-    return Collections.singleton(mut);
   }
 }
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index 199778a..4fecd99 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -25,6 +25,10 @@
   }
   
   public static byte[] objToBytes(Object o) {
+    if (o == null) {
+      return new byte[0];
+    }
+    
     if (o instanceof String) {
       String str = (String) o;
       return str.getBytes();