Merge pull request #147 from metamx/hadoop-max-partitions-config

Allow variable maxPartitionSize in hadoop indexer
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
index f34ff29..425b33c 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
@@ -504,7 +504,6 @@
   public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
   {
     private static final double SHARD_COMBINE_THRESHOLD = 0.25;
-    private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
     private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
 
     @Override
@@ -672,7 +671,7 @@
         // Make sure none of these shards are oversized
         boolean oversized = false;
         for(final DimPartition partition : dimPartitions.partitions) {
-          if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) {
+          if(partition.rows > config.getMaxPartitionSize()) {
             log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
             oversized = true;
           }
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
index 1dfad9d..364b880 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
@@ -236,7 +236,7 @@
       this.partitionsSpec = partitionsSpec;
     } else {
       // Backwards compatibility
-      this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false);
+      this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
     }
 
     if(granularitySpec != null) {
@@ -431,6 +431,11 @@
     return partitionsSpec.getTargetPartitionSize();
   }
 
+  public long getMaxPartitionSize()
+  {
+    return partitionsSpec.getMaxPartitionSize();
+  }
+
   public boolean isUpdaterJobSpecSet()
   {
     return (updaterJobSpec != null);
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
index e30bad3..5571422 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
@@ -8,22 +8,30 @@
 
 public class PartitionsSpec
 {
+  private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
+
   @Nullable
   private final String partitionDimension;
 
   private final long targetPartitionSize;
 
+  private final long maxPartitionSize;
+
   private final boolean assumeGrouped;
 
   @JsonCreator
   public PartitionsSpec(
       @JsonProperty("partitionDimension") @Nullable String partitionDimension,
       @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
+      @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
       @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
   )
   {
     this.partitionDimension = partitionDimension;
     this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
+    this.maxPartitionSize = maxPartitionSize == null
+                            ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
+                            : maxPartitionSize;
     this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
   }
 
@@ -47,6 +55,12 @@
   }
 
   @JsonProperty
+  public long getMaxPartitionSize()
+  {
+    return maxPartitionSize;
+  }
+
+  @JsonProperty
   public boolean isAssumeGrouped()
   {
     return assumeGrouped;
diff --git a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
index 5fdff8c..87ee95f 100644
--- a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -249,6 +249,12 @@
     );
 
     Assert.assertEquals(
+        "getMaxPartitionSize",
+        partitionsSpec.getMaxPartitionSize(),
+        150
+    );
+
+    Assert.assertEquals(
         "getPartitionDimension",
         partitionsSpec.getPartitionDimension(),
         "foo"
@@ -286,6 +292,58 @@
     );
 
     Assert.assertEquals(
+        "getMaxPartitionSize",
+        partitionsSpec.getMaxPartitionSize(),
+        150
+    );
+
+    Assert.assertEquals(
+        "getPartitionDimension",
+        partitionsSpec.getPartitionDimension(),
+        "foo"
+    );
+  }
+
+  @Test
+  public void testPartitionsSpecMaxPartitionSize() {
+    final HadoopDruidIndexerConfig cfg;
+
+    try {
+      cfg = jsonMapper.readValue(
+          "{"
+          + "\"partitionsSpec\":{"
+          + "   \"targetPartitionSize\":100,"
+          + "   \"maxPartitionSize\":200,"
+          + "   \"partitionDimension\":\"foo\""
+          + " }"
+          + "}",
+          HadoopDruidIndexerConfig.class
+      );
+    } catch(Exception e) {
+      throw Throwables.propagate(e);
+    }
+
+    final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
+
+    Assert.assertEquals(
+        "isDeterminingPartitions",
+        partitionsSpec.isDeterminingPartitions(),
+        true
+    );
+
+    Assert.assertEquals(
+        "getTargetPartitionSize",
+        partitionsSpec.getTargetPartitionSize(),
+        100
+    );
+
+    Assert.assertEquals(
+        "getMaxPartitionSize",
+        partitionsSpec.getMaxPartitionSize(),
+        200
+    );
+
+    Assert.assertEquals(
         "getPartitionDimension",
         partitionsSpec.getPartitionDimension(),
         "foo"