Merge pull request #500 from metamx/batch-ingestion-fixes
Batch ingestion fixes
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
index 543d2c4..a238d5e 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
@@ -23,6 +23,7 @@
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@@ -80,7 +81,7 @@
.put("created_date", new DateTime().toString())
.put("start", segment.getInterval().getStart().toString())
.put("end", segment.getInterval().getEnd().toString())
- .put("partitioned", segment.getShardSpec().getPartitionNum())
+ .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.put("version", segment.getVersion())
.put("used", true)
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index ae2d61a..530d155 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -37,6 +37,7 @@
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -65,7 +67,6 @@
*/
public class DetermineHashedPartitionsJob implements Jobby
{
- private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private final HadoopDruidIndexerConfig config;
@@ -98,8 +99,11 @@
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+ groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
if (!config.getSegmentGranularIntervals().isPresent()) {
groupByJob.setNumReduceTasks(1);
+ } else {
+ groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
JobHelper.setupClasspath(config, groupByJob);
@@ -124,9 +128,6 @@
if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
- if (!fileSystem.exists(intervalInfoPath)) {
- throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
- }
List<Interval> intervals = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{
@@ -144,37 +145,25 @@
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
}
- if (fileSystem.exists(partitionInfoPath)) {
- Long cardinality = config.jsonMapper.readValue(
- Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
- {
- }
- );
- int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
-
- if (numberOfShards > MAX_SHARDS) {
- throw new ISE(
- "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
- numberOfShards,
- MAX_SHARDS
- );
- }
-
- List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
- if (numberOfShards == 1) {
- actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
- } else {
- for (int i = 0; i < numberOfShards; ++i) {
- actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
- log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
- }
- }
-
- shardSpecs.put(bucket, actualSpecs);
-
- } else {
- log.info("Path[%s] didn't exist!?", partitionInfoPath);
+ final Long cardinality = config.jsonMapper.readValue(
+ Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
+ {
}
+ );
+ final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
+
+ List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
+ if (numberOfShards == 1) {
+ actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
+ } else {
+ for (int i = 0; i < numberOfShards; ++i) {
+ actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
+ log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
+ }
+ }
+
+ shardSpecs.put(bucket, actualSpecs);
+
}
config.setShardSpecs(shardSpecs);
log.info(
@@ -319,13 +308,6 @@
}
}
- private byte[] getDataBytes(BytesWritable writable)
- {
- byte[] rv = new byte[writable.getLength()];
- System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
- return rv;
- }
-
@Override
public void run(Context context)
throws IOException, InterruptedException
@@ -353,6 +335,50 @@
}
}
}
+
+ public static class DetermineHashedPartitionsPartitioner
+ extends Partitioner<LongWritable, BytesWritable> implements Configurable
+ {
+ private Configuration config;
+ private boolean determineIntervals;
+ private Map<LongWritable, Integer> reducerLookup;
+
+ @Override
+ public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
+ {
+
+ if (config.get("mapred.job.tracker").equals("local") || determineIntervals) {
+ return 0;
+ } else {
+ return reducerLookup.get(interval);
+ }
+ }
+
+ @Override
+ public Configuration getConf()
+ {
+ return config;
+ }
+
+ @Override
+ public void setConf(Configuration config)
+ {
+ this.config = config;
+ HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config);
+ if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
+ determineIntervals = false;
+ int reducerNumber = 0;
+ ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
+ for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) {
+ builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
+ }
+ reducerLookup = builder.build();
+ } else {
+ determineIntervals = true;
+ }
+ }
+ }
+
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
index 890a351..ddcb691 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
@@ -215,23 +215,20 @@
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
}
- if (fileSystem.exists(partitionInfoPath)) {
- List<ShardSpec> specs = config.jsonMapper.readValue(
- Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
- {
- }
- );
-
- List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
- for (int i = 0; i < specs.size(); ++i) {
- actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
- log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
- }
-
- shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
- } else {
- log.info("Path[%s] didn't exist!?", partitionInfoPath);
+ List<ShardSpec> specs = config.jsonMapper.readValue(
+ Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
+ {
}
+ );
+
+ List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
+ for (int i = 0; i < specs.size(); ++i) {
+ actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
+ log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
+ }
+
+ shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
+
}
config.setShardSpecs(shardSpecs);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 2076292..311eec6 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -23,6 +23,7 @@
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
+import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
@@ -56,13 +57,28 @@
if (config.isDeterminingPartitions()) {
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
+ int shardsPerInterval = config.getPartitionsSpec().getNumShards();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
- final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
- shardSpecs.put(bucket, Lists.newArrayList(spec));
- log.info("DateTime[%s], spec[%s]", bucket, spec);
+ if (shardsPerInterval > 0) {
+ List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
+ for (int i = 0; i < shardsPerInterval; i++) {
+ specs.add(
+ new HadoopyShardSpec(
+ new HashBasedNumberedShardSpec(i, shardsPerInterval),
+ shardCount++
+ )
+ );
+ }
+ shardSpecs.put(bucket, specs);
+ log.info("DateTime[%s], spec[%s]", bucket, specs);
+ } else {
+ final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
+ shardSpecs.put(bucket, Lists.newArrayList(spec));
+ log.info("DateTime[%s], spec[%s]", bucket, spec);
+ }
}
config.setShardSpecs(shardSpecs);
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
index 90fab3e..e0d7deb 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
@@ -20,6 +20,7 @@
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
public abstract class AbstractPartitionsSpec implements PartitionsSpec
@@ -28,11 +29,13 @@
private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped;
+ private final int numShards;
public AbstractPartitionsSpec(
Long targetPartitionSize,
Long maxPartitionSize,
- Boolean assumeGrouped
+ Boolean assumeGrouped,
+ Integer numShards
)
{
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
@@ -40,6 +43,11 @@
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
+ this.numShards = numShards == null ? -1 : numShards;
+ Preconditions.checkArgument(
+ this.targetPartitionSize == -1 || this.numShards == -1,
+ "targetPartitionsSize and shardCount both cannot be set"
+ );
}
@JsonProperty
@@ -65,4 +73,10 @@
{
return targetPartitionSize > 0;
}
+
+ @Override
+ public int getNumShards()
+ {
+ return numShards;
+ }
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
index d164cef..ca53b95 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java
@@ -33,10 +33,11 @@
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
- @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+ @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
+ @JsonProperty("numShards") @Nullable Integer numShards
)
{
- super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+ super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
}
@Override
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
index cce5de8..a36555f 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
@@ -49,4 +49,7 @@
@JsonIgnore
public boolean isDeterminingPartitions();
+ @JsonProperty
+ public int getNumShards();
+
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
index 30f13f4..777db4c 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
@@ -21,9 +21,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import io.druid.indexer.DetermineHashedPartitionsJob;
-import io.druid.indexer.HadoopDruidIndexerConfig;
-import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
@@ -35,9 +32,10 @@
public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
- @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+ @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
+ @JsonProperty("numShards") @Nullable Integer numShards
)
{
- super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+ super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
index 118d135..7964c1c 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
@@ -41,7 +41,7 @@
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
- super(targetPartitionSize, maxPartitionSize, assumeGrouped);
+ super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
this.partitionDimension = partitionDimension;
}
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
index c6bb0ba..ba03ca7 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -216,10 +216,10 @@
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
"foo"
);
}
@@ -262,10 +262,10 @@
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
"foo"
);
}
@@ -311,10 +311,10 @@
200
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
"foo"
);
}
@@ -503,7 +503,8 @@
}
@Test
- public void testRandomPartitionsSpec() throws Exception{
+ public void testRandomPartitionsSpec() throws Exception
+ {
{
final HadoopDruidIndexerConfig cfg;
@@ -542,12 +543,13 @@
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof RandomPartitionsSpec);
}
}
@Test
- public void testHashedPartitionsSpec() throws Exception{
+ public void testHashedPartitionsSpec() throws Exception
+ {
{
final HadoopDruidIndexerConfig cfg;
@@ -586,7 +588,57 @@
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec);
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}
}
+
+ @Test
+ public void testHashedPartitionsSpecShardCount() throws Exception
+ {
+ final HadoopDruidIndexerConfig cfg;
+
+ try {
+ cfg = jsonReadWriteRead(
+ "{"
+ + "\"partitionsSpec\":{"
+ + " \"type\":\"hashed\","
+ + " \"numShards\":2"
+ + " }"
+ + "}",
+ HadoopDruidIndexerConfig.class
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
+
+ Assert.assertEquals(
+ "isDeterminingPartitions",
+ partitionsSpec.isDeterminingPartitions(),
+ false
+ );
+
+ Assert.assertEquals(
+ "getTargetPartitionSize",
+ partitionsSpec.getTargetPartitionSize(),
+ -1
+ );
+
+ Assert.assertEquals(
+ "getMaxPartitionSize",
+ partitionsSpec.getMaxPartitionSize(),
+ -1
+ );
+
+ Assert.assertEquals(
+ "shardCount",
+ partitionsSpec.getNumShards(),
+ 2
+ );
+
+ Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
+
+ }
}
diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
index b97738a..bef339a 100644
--- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
+++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java
@@ -25,6 +25,7 @@
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@@ -104,7 +105,7 @@
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
- .bind("partitioned", segment.getShardSpec().getPartitionNum())
+ .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))