ACCUMULO-532 check in bsp input/output format modified patch to accumulo-contrib

git-svn-id: https://svn.apache.org/repos/asf/accumulo/contrib/trunk/bsp@1329343 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..27a695d
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,22 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>accumulo-contrib</artifactId>
+    <groupId>org.apache.accumulo</groupId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>accumulo-bsp</artifactId>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>0.4.0-incubating</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java b/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
new file mode 100644
index 0000000..883cdfd
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.bsp;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.RecordReader;
+
+public class AccumuloInputFormat extends org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat implements InputFormat<Key,Value> {
+  public class BSPRecordReaderBase extends RecordReaderBase<Key,Value> implements RecordReader<Key,Value> {
+    public BSPRecordReaderBase(InputSplit split, BSPJob job) throws IOException {
+      this.initialize((BSPRangeInputSplit) split, job.getConf());
+    }
+    
+    /*
+     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
+     */
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return next(currentKey, currentValue);
+    }
+    
+    /*
+     * @see org.apache.hama.bsp.RecordReader#createKey()
+     */
+    @Override
+    public Key createKey() {
+      if (currentKey == null) {
+        return new Key();
+      } else {
+        return currentKey;
+      }
+    }
+    
+    /*
+     * @see org.apache.hama.bsp.RecordReader#createValue()
+     */
+    @Override
+    public Value createValue() {
+      if (currentValue == null) {
+        return new Value(new byte[0]);
+      } else {
+        return currentValue;
+      }
+    }
+    
+    /*
+     * @see org.apache.hama.bsp.RecordReader#getPos()
+     */
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+    
+    /*
+     * @see org.apache.hama.bsp.RecordReader#next(java.lang.Object, java.lang.Object)
+     */
+    @Override
+    public boolean next(Key k, Value v) throws IOException {
+      if (scannerIterator.hasNext()) {
+        ++numKeysRead;
+        Entry<Key,Value> entry = scannerIterator.next();
+        currentKey = entry.getKey();
+        currentValue = entry.getValue();
+        k.set(currentKey);
+        v.set(currentValue.get());
+        return true;
+      }
+      return false;
+    }
+  }
+  
+  public static class BSPRangeInputSplit extends RangeInputSplit implements InputSplit {
+    public BSPRangeInputSplit() {
+      super();
+    }
+    
+    public BSPRangeInputSplit(RangeInputSplit split) throws IOException {
+      super(split);
+    }
+  }
+  
+  @Override
+  public RecordReader<Key,Value> getRecordReader(InputSplit split, BSPJob job) throws IOException {
+    return new BSPRecordReaderBase(split, job);
+  }
+  
+  @Override
+  public InputSplit[] getSplits(BSPJob job, int arg1) throws IOException {
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(job.getConf());
+    InputSplit[] bspSplits = new BSPRangeInputSplit[splits.size()];
+    for (int i = 0; i < splits.size(); i++) {
+      bspSplits[i] = new BSPRangeInputSplit((RangeInputSplit) splits.get(i));
+    }
+    return bspSplits;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java b/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
new file mode 100644
index 0000000..1b0ea81
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.bsp;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.OutputFormat;
+import org.apache.hama.bsp.RecordWriter;
+
+public class AccumuloOutputFormat extends org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
+  
+  protected static class BSPRecordWriter extends AccumuloRecordWriter implements RecordWriter<Text,Mutation> {
+    BSPRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+      super(conf);
+    }
+    
+    /*
+     * @see org.apache.hama.bsp.RecordWriter#close()
+     */
+    @Override
+    public void close() throws IOException {
+      try {
+        close(null);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+    
+  }
+  
+  /*
+   * @see org.apache.hama.bsp.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hama.bsp.BSPJob)
+   */
+  @Override
+  public void checkOutputSpecs(FileSystem fs, BSPJob job) throws IOException {
+    checkOutputSpecs(job.getConf());
+  }
+  
+  /*
+   * @see org.apache.hama.bsp.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hama.bsp.BSPJob, java.lang.String)
+   */
+  @Override
+  public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob job, String arg2) throws IOException {
+    try {
+      return new BSPRecordWriter(job.getConf());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git a/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
new file mode 100644
index 0000000..d6d6908
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.bsp.AccumuloInputFormat;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.After;
+import org.junit.Test;
+
+public class AccumuloInputFormatTest {
+  
+  @After
+  public void tearDown() throws Exception {}
+  
+  /**
+   * Test basic setting & getting of max versions.
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @Test
+  public void testMaxVersions() throws IOException {
+    BSPJob job = new BSPJob();
+    AccumuloInputFormat.setMaxVersions(job.getConf(), 1);
+    int version = AccumuloInputFormat.getMaxVersions(job.getConf());
+    assertEquals(1, version);
+  }
+  
+  @Test(expected = IOException.class)
+  public void testMaxVersionsLessThan1() throws IOException {
+    BSPJob job = new BSPJob();
+    AccumuloInputFormat.setMaxVersions(job.getConf(), 0);
+  }
+  
+  @Test
+  public void testNoMaxVersion() throws IOException {
+    BSPJob job = new BSPJob();
+    assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConf()));
+  }
+  
+  @Test
+  public void testSetIterator() throws IOException {
+    BSPJob job = new BSPJob();
+    
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
+    Configuration conf = job.getConf();
+    String iterators = conf.get("AccumuloInputFormat.iterators");
+    assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
+  }
+  
+  @Test
+  public void testAddIterator() throws IOException {
+    BSPJob job = new BSPJob();
+    
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
+    iter.addOption("v1", "1");
+    iter.addOption("junk", "\0omg:!\\xyzzy");
+    AccumuloInputFormat.addIterator(job.getConf(), iter);
+    
+    List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConf());
+    
+    // Check the list size
+    assertTrue(list.size() == 3);
+    
+    // Walk the list and make sure our settings are correct
+    AccumuloIterator setting = list.get(0);
+    assertEquals(1, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
+    assertEquals("WholeRow", setting.getIteratorName());
+    
+    setting = list.get(1);
+    assertEquals(2, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
+    assertEquals("Versions", setting.getIteratorName());
+    
+    setting = list.get(2);
+    assertEquals(3, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
+    assertEquals("Count", setting.getIteratorName());
+    
+    List<AccumuloIteratorOption> iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConf());
+    assertEquals(2, iteratorOptions.size());
+    assertEquals("Count", iteratorOptions.get(0).getIteratorName());
+    assertEquals("Count", iteratorOptions.get(1).getIteratorName());
+    assertEquals("v1", iteratorOptions.get(0).getKey());
+    assertEquals("1", iteratorOptions.get(0).getValue());
+    assertEquals("junk", iteratorOptions.get(1).getKey());
+    assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue());
+  }
+  
+  @Test
+  public void testIteratorOptionEncoding() throws Throwable {
+    String key = "colon:delimited:key";
+    String value = "comma,delimited,value";
+    IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
+    someSetting.addOption(key, value);
+    BSPJob job = new BSPJob();
+    AccumuloInputFormat.addIterator(job.getConf(), someSetting);
+    
+    final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
+    
+    assertEquals(rawConfigOpt, job.getConf().get("AccumuloInputFormat.iterators.options"));
+    
+    List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConf());
+    assertEquals(1, opts.size());
+    assertEquals(opts.get(0).getKey(), key);
+    assertEquals(opts.get(0).getValue(), value);
+    
+    someSetting.addOption(key + "2", value);
+    someSetting.setPriority(2);
+    someSetting.setName("it2");
+    AccumuloInputFormat.addIterator(job.getConf(), someSetting);
+    opts = AccumuloInputFormat.getIteratorOptions(job.getConf());
+    assertEquals(3, opts.size());
+    for (AccumuloIteratorOption opt : opts) {
+      assertEquals(opt.getKey().substring(0, key.length()), key);
+      assertEquals(opt.getValue(), value);
+    }
+  }
+  
+  @Test
+  public void testGetIteratorSettings() throws IOException {
+    BSPJob job = new BSPJob();
+    
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+    
+    List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConf());
+    
+    // Check the list size
+    assertTrue(list.size() == 3);
+    
+    // Walk the list and make sure our settings are correct
+    AccumuloIterator setting = list.get(0);
+    assertEquals(1, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
+    assertEquals("WholeRow", setting.getIteratorName());
+    
+    setting = list.get(1);
+    assertEquals(2, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
+    assertEquals("Versions", setting.getIteratorName());
+    
+    setting = list.get(2);
+    assertEquals(3, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
+    assertEquals("Count", setting.getIteratorName());
+    
+  }
+  
+  @Test
+  public void testSetRegex() throws IOException {
+    BSPJob job = new BSPJob();
+    
+    String regex = ">\"*%<>\'\\";
+    
+    IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, regex, null, null, null, false);
+    AccumuloInputFormat.addIterator(job.getConf(), is);
+    
+    assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConf()).get(0).getIteratorName()));
+  }
+  
+  static class TestBSP extends BSP<Key,Value,Key,Value> {
+    Key key = null;
+    int count = 0;
+    
+    @Override
+    public void bsp(BSPPeer<Key,Value,Key,Value> peer) throws IOException, SyncException, InterruptedException {
+      // this method reads the next key value record from file
+      KeyValuePair<Key,Value> pair;
+      
+      while ((pair = peer.readNext()) != null) {
+        if (key != null) {
+          assertEquals(key.getRow().toString(), new String(pair.getValue().get()));
+        }
+        
+        assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", count + 1)));
+        assertEquals(new String(pair.getValue().get()), String.format("%09x", count));
+        count++;
+        
+        key = new Key(pair.getKey());
+      }
+      
+      peer.sync();
+      assertEquals(100, count);
+    }
+  }
+  
+  @Test
+  public void testBsp() throws Exception {
+    MockInstance mockInstance = new MockInstance("testmapinstance");
+    Connector c = mockInstance.getConnector("root", new byte[] {});
+    if (c.tableOperations().exists("testtable"))
+      c.tableOperations().delete("testtable");
+    c.tableOperations().create("testtable");
+    
+    BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    BSPJob job = new BSPJob(new HamaConfiguration());
+    job.setInputFormat(AccumuloInputFormat.class);
+    job.setBspClass(TestBSP.class);
+    job.setInputPath(new Path("test"));
+    AccumuloInputFormat.setInputInfo(job.getConf(), "root", "".getBytes(), "testtable", new Authorizations());
+    AccumuloInputFormat.setMockInstance(job.getConf(), "testmapinstance");
+    
+    AccumuloInputFormat input = new AccumuloInputFormat();
+    InputSplit[] splits = input.getSplits(job, 0);
+    assertEquals(splits.length, 1);
+    
+    job.waitForCompletion(false);
+  }
+}
diff --git a/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
new file mode 100644
index 0000000..a02d960
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.bsp.AccumuloInputFormat;
+import org.apache.accumulo.bsp.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.Test;
+
+public class AccumuloOutputFormatTest {
+  
+  static class TestBSP extends BSP<Key,Value,Text,Mutation> {
+    Key key = null;
+    int count = 0;
+    
+    @Override
+    public void bsp(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException, SyncException, InterruptedException {
+      // this method reads the next key value record from file
+      KeyValuePair<Key,Value> pair;
+      
+      while ((pair = peer.readNext()) != null) {
+        if (key != null) {
+          assertEquals(key.getRow().toString(), new String(pair.getValue().get()));
+        }
+        
+        assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", count + 1)));
+        assertEquals(new String(pair.getValue().get()), String.format("%09x", count));
+        count++;
+        
+        key = new Key(pair.getKey());
+      }
+      
+      peer.sync();
+    }
+    
+    @Override
+    public void cleanup(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException {
+      Mutation m = new Mutation("total");
+      m.put("", "", Integer.toString(count));
+      peer.write(new Text("testtable2"), m);
+    }
+  }
+  
+  @Test
+  public void testBSP() throws Exception {
+    MockInstance mockInstance = new MockInstance("testmrinstance");
+    Connector c = mockInstance.getConnector("root", new byte[] {});
+    if (c.tableOperations().exists("testtable1"))
+      c.tableOperations().delete("testtable1");
+    if (c.tableOperations().exists("testtable2"))
+      c.tableOperations().delete("testtable2");
+    
+    c.tableOperations().create("testtable1");
+    c.tableOperations().create("testtable2");
+    BatchWriter bw = c.createBatchWriter("testtable1", 10000L, 1000L, 4);
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    Configuration conf = new Configuration();
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setJobName("Test Input Output");
+    
+    bsp.setBspClass(TestBSP.class);
+    bsp.setInputFormat(AccumuloInputFormat.class);
+    bsp.setInputPath(new Path("test"));
+    
+    bsp.setOutputFormat(AccumuloOutputFormat.class);
+    bsp.setOutputPath(new Path("test"));
+    
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(Mutation.class);
+    
+    AccumuloInputFormat.setInputInfo(bsp.getConf(), "root", "".getBytes(), "testtable1", new Authorizations());
+    AccumuloInputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
+    AccumuloOutputFormat.setOutputInfo(bsp.getConf(), "root", "".getBytes(), false, "testtable2");
+    AccumuloOutputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
+    
+    AccumuloInputFormat input = new AccumuloInputFormat();
+    InputSplit[] splits = input.getSplits(bsp, 0);
+    assertEquals(splits.length, 1);
+    
+    bsp.waitForCompletion(false);
+    
+    Scanner scanner = c.createScanner("testtable2", new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals("total", entry.getKey().getRow().toString());
+    assertEquals(100, Integer.parseInt(new String(entry.getValue().get())));
+    assertFalse(iter.hasNext());
+    
+  }
+}