Merge pull request #147 from metamx/hadoop-max-partitions-config
Allow variable maxPartitionSize in hadoop indexer
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
index f34ff29..425b33c 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java
@@ -504,7 +504,6 @@
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
- private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
@Override
@@ -672,7 +671,7 @@
// Make sure none of these shards are oversized
boolean oversized = false;
for(final DimPartition partition : dimPartitions.partitions) {
- if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) {
+ if(partition.rows > config.getMaxPartitionSize()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true;
}
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
index 1dfad9d..364b880 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java
@@ -236,7 +236,7 @@
this.partitionsSpec = partitionsSpec;
} else {
// Backwards compatibility
- this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false);
+ this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if(granularitySpec != null) {
@@ -431,6 +431,11 @@
return partitionsSpec.getTargetPartitionSize();
}
+ public long getMaxPartitionSize()
+ {
+ return partitionsSpec.getMaxPartitionSize();
+ }
+
public boolean isUpdaterJobSpecSet()
{
return (updaterJobSpec != null);
diff --git a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
index e30bad3..5571422 100644
--- a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
+++ b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java
@@ -8,22 +8,30 @@
public class PartitionsSpec
{
+ private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
+
@Nullable
private final String partitionDimension;
private final long targetPartitionSize;
+ private final long maxPartitionSize;
+
private final boolean assumeGrouped;
@JsonCreator
public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
+ @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
+ this.maxPartitionSize = maxPartitionSize == null
+ ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
+ : maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@@ -47,6 +55,12 @@
}
@JsonProperty
+ public long getMaxPartitionSize()
+ {
+ return maxPartitionSize;
+ }
+
+ @JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
diff --git a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
index 5fdff8c..87ee95f 100644
--- a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -249,6 +249,12 @@
);
Assert.assertEquals(
+ "getMaxPartitionSize",
+ partitionsSpec.getMaxPartitionSize(),
+ 150
+ );
+
+ Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
@@ -286,6 +292,58 @@
);
Assert.assertEquals(
+ "getMaxPartitionSize",
+ partitionsSpec.getMaxPartitionSize(),
+ 150
+ );
+
+ Assert.assertEquals(
+ "getPartitionDimension",
+ partitionsSpec.getPartitionDimension(),
+ "foo"
+ );
+ }
+
+ @Test
+ public void testPartitionsSpecMaxPartitionSize() {
+ final HadoopDruidIndexerConfig cfg;
+
+ try {
+ cfg = jsonMapper.readValue(
+ "{"
+ + "\"partitionsSpec\":{"
+ + " \"targetPartitionSize\":100,"
+ + " \"maxPartitionSize\":200,"
+ + " \"partitionDimension\":\"foo\""
+ + " }"
+ + "}",
+ HadoopDruidIndexerConfig.class
+ );
+ } catch(Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
+
+ Assert.assertEquals(
+ "isDeterminingPartitions",
+ partitionsSpec.isDeterminingPartitions(),
+ true
+ );
+
+ Assert.assertEquals(
+ "getTargetPartitionSize",
+ partitionsSpec.getTargetPartitionSize(),
+ 100
+ );
+
+ Assert.assertEquals(
+ "getMaxPartitionSize",
+ partitionsSpec.getMaxPartitionSize(),
+ 200
+ );
+
+ Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"