added test for block reader definePartitions
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
index 09f891b..50a2061 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
@@ -1,16 +1,21 @@
 package com.datatorrent.lib.io.block;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
 
+import com.datatorrent.common.util.Slice;
 import com.datatorrent.lib.counters.BasicCounters;
 
 /**
@@ -67,6 +72,35 @@
     Assert.assertEquals("partition count not changed", 8, sliceReader.getPartitionCount());
   }
 
+  @Test
+  public void testPartitioning() throws Exception
+  {
+    PseudoBatchedOperatorStats readerStats = new PseudoBatchedOperatorStats(2);
+    readerStats.operatorStats = Lists.newArrayList();
+    readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1));
+
+    TestReader sliceReader = new TestReader();
+
+    StatsListener.Response response = sliceReader.processStats(readerStats);
+
+    Assert.assertTrue("partition needed", response.repartitionRequired);
+    Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount());
+
+    List<Partitioner.Partition<AbstractBlockReader<Slice,
+      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+
+    DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
+      new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+
+    PseudoParttion pseudoParttion = new PseudoParttion(apartition, readerStats);
+
+    partitions.add(pseudoParttion);
+
+    Collection<Partitioner.Partition<AbstractBlockReader<Slice,
+      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+    Assert.assertEquals(8, newPartitions.size());
+  }
+
   static class PseudoBatchedOperatorStats implements StatsListener.BatchedOperatorStats
   {
 
@@ -121,6 +155,19 @@
     }
   }
 
+  static class PseudoParttion extends DefaultPartition<AbstractBlockReader<Slice,
+    BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+  {
+
+    PseudoParttion(DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata,
+      FSDataInputStream>> defaultPartition, StatsListener.BatchedOperatorStats stats)
+    {
+      super(defaultPartition.getPartitionedInstance(), defaultPartition.getPartitionKeys(),
+        defaultPartition.getLoad(), stats);
+
+    }
+  }
+
   static class ReaderStats extends Stats.OperatorStats
   {