Merge pull request #500 from metamx/batch-ingestion-fixes

Batch ingestion fixes
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
index 543d2c4..a238d5e 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
@@ -23,6 +23,7 @@
 import com.metamx.common.logger.Logger;
 import io.druid.db.DbConnector;
 import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -80,7 +81,7 @@
                       .put("created_date", new DateTime().toString())
                       .put("start", segment.getInterval().getStart().toString())
                       .put("end", segment.getInterval().getEnd().toString())
-                      .put("partitioned", segment.getShardSpec().getPartitionNum())
+                      .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
                       .put("version", segment.getVersion())
                       .put("used", true)
                       .put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index ae2d61a..530d155 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -37,6 +37,7 @@
 import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
 import io.druid.timeline.partition.HashBasedNumberedShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -65,7 +67,6 @@
  */
 public class DetermineHashedPartitionsJob implements Jobby
 {
-  private static final int MAX_SHARDS = 128;
   private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
   private final HadoopDruidIndexerConfig config;
 
@@ -98,8 +99,11 @@
       groupByJob.setOutputKeyClass(NullWritable.class);
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+      groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
       if (!config.getSegmentGranularIntervals().isPresent()) {
         groupByJob.setNumReduceTasks(1);
+      } else {
+        groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
       }
       JobHelper.setupClasspath(config, groupByJob);
 
@@ -124,9 +128,6 @@
       if (!config.getSegmentGranularIntervals().isPresent()) {
         final Path intervalInfoPath = config.makeIntervalInfoPath();
         fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
-        if (!fileSystem.exists(intervalInfoPath)) {
-          throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
-        }
         List<Interval> intervals = config.jsonMapper.readValue(
             Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
         {
@@ -144,37 +145,25 @@
         if (fileSystem == null) {
           fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
         }
-        if (fileSystem.exists(partitionInfoPath)) {
-          Long cardinality = config.jsonMapper.readValue(
-              Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
-          {
-          }
-          );
-          int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
-
-          if (numberOfShards > MAX_SHARDS) {
-            throw new ISE(
-                "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
-                numberOfShards,
-                MAX_SHARDS
-            );
-          }
-
-          List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
-          if (numberOfShards == 1) {
-            actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
-          } else {
-            for (int i = 0; i < numberOfShards; ++i) {
-              actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
-              log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
-            }
-          }
-
-          shardSpecs.put(bucket, actualSpecs);
-
-        } else {
-          log.info("Path[%s] didn't exist!?", partitionInfoPath);
+        final Long cardinality = config.jsonMapper.readValue(
+            Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
+        {
         }
+        );
+        final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
+
+        List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
+        if (numberOfShards == 1) {
+          actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
+        } else {
+          for (int i = 0; i < numberOfShards; ++i) {
+            actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
+            log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
+          }
+        }
+
+        shardSpecs.put(bucket, actualSpecs);
+
       }
       config.setShardSpecs(shardSpecs);
       log.info(
@@ -319,13 +308,6 @@
       }
     }
 
-    private byte[] getDataBytes(BytesWritable writable)
-    {
-      byte[] rv = new byte[writable.getLength()];
-      System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
-      return rv;
-    }
-
     @Override
     public void run(Context context)
         throws IOException, InterruptedException
@@ -353,6 +335,50 @@
       }
     }
   }
+
+  public static class DetermineHashedPartitionsPartitioner
+      extends Partitioner<LongWritable, BytesWritable> implements Configurable
+  {
+    private Configuration config;
+    private boolean determineIntervals;
+    private Map<LongWritable, Integer> reducerLookup;
+
+    @Override
+    public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
+    {
+
+      if (config.get("mapred.job.tracker").equals("local") || determineIntervals) {
+        return 0;
+      } else {
+        return reducerLookup.get(interval);
+      }
+    }
+
+    @Override
+    public Configuration getConf()
+    {
+      return config;
+    }
+
+    @Override
+    public void setConf(Configuration config)
+    {
+      this.config = config;
+      HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config);
+      if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
+        determineIntervals = false;
+        int reducerNumber = 0;
+        ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
+        for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) {
+          builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
+        }
+        reducerLookup = builder.build();
+      } else {
+        determineIntervals = true;
+      }
+    }
+  }
+
 }
 
 
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
index 890a351..ddcb691 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
@@ -215,23 +215,20 @@
         if (fileSystem == null) {
           fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
         }
-        if (fileSystem.exists(partitionInfoPath)) {
-          List<ShardSpec> specs = config.jsonMapper.readValue(
-              Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
-          {
-          }
-          );
-
-          List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
-          for (int i = 0; i < specs.size(); ++i) {
-            actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
-            log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
-          }
-
-          shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
-        } else {
-          log.info("Path[%s] didn't exist!?", partitionInfoPath);
+        List<ShardSpec> specs = config.jsonMapper.readValue(
+            Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
+        {
         }
+        );
+
+        List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
+        for (int i = 0; i < specs.size(); ++i) {
+          actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
+          log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
+        }
+
+        shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
+
       }
       config.setShardSpecs(shardSpecs);
 
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 2076292..311eec6 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.metamx.common.logger.Logger;
+import io.druid.timeline.partition.HashBasedNumberedShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeComparator;
@@ -56,13 +57,28 @@
     if (config.isDeterminingPartitions()) {
       jobs.add(config.getPartitionsSpec().getPartitionJob(config));
     } else {
+      int shardsPerInterval = config.getPartitionsSpec().getNumShards();
       Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
       int shardCount = 0;
       for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
         DateTime bucket = segmentGranularity.getStart();
-        final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
-        shardSpecs.put(bucket, Lists.newArrayList(spec));
-        log.info("DateTime[%s], spec[%s]", bucket, spec);
+        if (shardsPerInterval > 0) {
+          List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
+          for (int i = 0; i < shardsPerInterval; i++) {
+            specs.add(
+                new HadoopyShardSpec(
+                    new HashBasedNumberedShardSpec(i, shardsPerInterval),
+                    shardCount++
+                )
+            );
+          }
+          shardSpecs.put(bucket, specs);
+          log.info("DateTime[%s], spec[%s]", bucket, specs);
+        } else {
+          final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
+          shardSpecs.put(bucket, Lists.newArrayList(spec));
+          log.info("DateTime[%s], spec[%s]", bucket, spec);
+        }
       }
       config.setShardSpecs(shardSpecs);
     }
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
index 90fab3e..e0d7deb 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
@@ -20,6 +20,7 @@
 package io.druid.indexer.partitions;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 
 
 public abstract class AbstractPartitionsSpec implements PartitionsSpec
@@ -28,11 +29,13 @@
   private final long targetPartitionSize;
   private final long maxPartitionSize;
   private final boolean assumeGrouped;
+  private final int numShards;
 
   public AbstractPartitionsSpec(
       Long targetPartitionSize,
       Long maxPartitionSize,
-      Boolean assumeGrouped
+      Boolean assumeGrouped,
+      Integer numShards
   )
   {
     this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
@@ -40,6 +43,11 @@
                             ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
                             : maxPartitionSize;
     this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
+    this.numShards = numShards == null ? -1 : numShards;
+    Preconditions.checkArgument(
+        this.targetPartitionSize == -1 || this.numShards == -1,
+        "targetPartitionsSize and shardCount both cannot be set"
+    );
   }
 
   @JsonProperty
@@ -65,4 +73,10 @@
   {
     return targetPartitionSize > 0;
   }
+
+  @Override
+  public int getNumShards()
+  {
+    return numShards;
+  }
 }
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
index d164cef..ca53b95 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
@@ -33,10 +33,11 @@
   public HashedPartitionsSpec(
       @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
       @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
-      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
+      @JsonProperty("numShards") @Nullable Integer numShards
   )
   {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+    super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
   }
 
   @Override
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
index cce5de8..a36555f 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
@@ -49,4 +49,7 @@
   @JsonIgnore
   public boolean isDeterminingPartitions();
 
+  @JsonProperty
+  public int getNumShards();
+
 }
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
index 30f13f4..777db4c 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
@@ -21,9 +21,6 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import io.druid.indexer.DetermineHashedPartitionsJob;
-import io.druid.indexer.HadoopDruidIndexerConfig;
-import io.druid.indexer.Jobby;
 
 import javax.annotation.Nullable;
 
@@ -35,9 +32,10 @@
   public RandomPartitionsSpec(
       @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
       @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
-      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
+      @JsonProperty("numShards") @Nullable Integer numShards
   )
   {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+    super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
   }
 }
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
index 118d135..7964c1c 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
@@ -41,7 +41,7 @@
       @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
   )
   {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+    super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
     this.partitionDimension = partitionDimension;
   }
 
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
index c6bb0ba..ba03ca7 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -216,10 +216,10 @@
         150
     );
 
-    Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
     Assert.assertEquals(
         "getPartitionDimension",
-        ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+        ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
         "foo"
     );
   }
@@ -262,10 +262,10 @@
         150
     );
 
-    Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
     Assert.assertEquals(
         "getPartitionDimension",
-        ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+        ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
         "foo"
     );
   }
@@ -311,10 +311,10 @@
         200
     );
 
-    Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
     Assert.assertEquals(
         "getPartitionDimension",
-        ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+        ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
         "foo"
     );
   }
@@ -503,7 +503,8 @@
   }
 
   @Test
-  public void testRandomPartitionsSpec() throws Exception{
+  public void testRandomPartitionsSpec() throws Exception
+  {
     {
       final HadoopDruidIndexerConfig cfg;
 
@@ -542,12 +543,13 @@
           150
       );
 
-      Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
+      Assert.assertTrue("partitionsSpec", partitionsSpec instanceof RandomPartitionsSpec);
     }
   }
 
   @Test
-  public void testHashedPartitionsSpec() throws Exception{
+  public void testHashedPartitionsSpec() throws Exception
+  {
     {
       final HadoopDruidIndexerConfig cfg;
 
@@ -586,7 +588,57 @@
           150
       );
 
-      Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec);
+      Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
     }
   }
+
+  @Test
+  public void testHashedPartitionsSpecShardCount() throws Exception
+  {
+    final HadoopDruidIndexerConfig cfg;
+
+    try {
+      cfg = jsonReadWriteRead(
+          "{"
+          + "\"partitionsSpec\":{"
+          + "   \"type\":\"hashed\","
+          + "   \"numShards\":2"
+          + " }"
+          + "}",
+          HadoopDruidIndexerConfig.class
+      );
+    }
+    catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+
+    final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
+
+    Assert.assertEquals(
+        "isDeterminingPartitions",
+        partitionsSpec.isDeterminingPartitions(),
+        false
+    );
+
+    Assert.assertEquals(
+        "getTargetPartitionSize",
+        partitionsSpec.getTargetPartitionSize(),
+        -1
+    );
+
+    Assert.assertEquals(
+        "getMaxPartitionSize",
+        partitionsSpec.getMaxPartitionSize(),
+        -1
+    );
+
+    Assert.assertEquals(
+        "shardCount",
+        partitionsSpec.getNumShards(),
+        2
+    );
+
+    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
+
+  }
 }
diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
index b97738a..bef339a 100644
--- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
+++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
@@ -25,6 +25,7 @@
 import io.druid.db.DbConnector;
 import io.druid.db.DbTablesConfig;
 import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -104,7 +105,7 @@
                     .bind("created_date", new DateTime().toString())
                     .bind("start", segment.getInterval().getStart().toString())
                     .bind("end", segment.getInterval().getEnd().toString())
-                    .bind("partitioned", segment.getShardSpec().getPartitionNum())
+                    .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
                     .bind("version", segment.getVersion())
                     .bind("used", true)
                     .bind("payload", jsonMapper.writeValueAsString(segment))