added test for block reader definePartitions
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
index 09f891b..50a2061 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
@@ -1,16 +1,21 @@
package com.datatorrent.lib.io.block;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Lists;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
+import com.datatorrent.common.util.Slice;
import com.datatorrent.lib.counters.BasicCounters;
/**
@@ -67,6 +72,35 @@
Assert.assertEquals("partition count not changed", 8, sliceReader.getPartitionCount());
}
+ @Test
+ public void testPartitioning() throws Exception
+ {
+ PseudoBatchedOperatorStats readerStats = new PseudoBatchedOperatorStats(2);
+ readerStats.operatorStats = Lists.newArrayList();
+ readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1));
+
+ TestReader sliceReader = new TestReader();
+
+ StatsListener.Response response = sliceReader.processStats(readerStats);
+
+ Assert.assertTrue("partition needed", response.repartitionRequired);
+ Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount());
+
+ List<Partitioner.Partition<AbstractBlockReader<Slice,
+ BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+
+ DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
+ new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+
+ PseudoParttion pseudoParttion = new PseudoParttion(apartition, readerStats);
+
+ partitions.add(pseudoParttion);
+
+ Collection<Partitioner.Partition<AbstractBlockReader<Slice,
+ BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+ Assert.assertEquals(8, newPartitions.size());
+ }
+
static class PseudoBatchedOperatorStats implements StatsListener.BatchedOperatorStats
{
@@ -121,6 +155,19 @@
}
}
+ static class PseudoParttion extends DefaultPartition<AbstractBlockReader<Slice,
+ BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+ {
+
+ PseudoParttion(DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata,
+ FSDataInputStream>> defaultPartition, StatsListener.BatchedOperatorStats stats)
+ {
+ super(defaultPartition.getPartitionedInstance(), defaultPartition.getPartitionKeys(),
+ defaultPartition.getLoad(), stats);
+
+ }
+ }
+
static class ReaderStats extends Stats.OperatorStats
{