fix day 1 issue with VIT
diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
index 6e3c052..bb1b18e 100644
--- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
+++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
@@ -56,7 +56,6 @@
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result;
-
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -121,7 +120,8 @@
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
- final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null;
+ final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
+ && strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final Query<T> rewrittenQuery;
@@ -161,22 +161,22 @@
}
final byte[] queryCacheKey;
- if(strategy != null) {
+ if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
// Pull cached segments from cache and remove from set of segments to query
- if(useCache && queryCacheKey != null) {
+ if (useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
- for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
+ for (Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
}
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
- for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
+ for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue();
@@ -191,8 +191,7 @@
// remove cached segment from set of segments to query
segments.remove(segment);
- }
- else {
+ } else {
final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
@@ -203,22 +202,22 @@
}
// Compile list of all segments not pulled from cache
- for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
+ for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) {
log.error("No servers found for %s?! How can this be?!", segment.rhs);
+ } else {
+ final DruidServer server = queryableDruidServer.getServer();
+ List<SegmentDescriptor> descriptors = serverSegments.get(server);
+
+ if (descriptors == null) {
+ descriptors = Lists.newArrayList();
+ serverSegments.put(server, descriptors);
+ }
+
+ descriptors.add(segment.rhs);
}
-
- final DruidServer server = queryableDruidServer.getServer();
- List<SegmentDescriptor> descriptors = serverSegments.get(server);
-
- if (descriptors == null) {
- descriptors = Lists.newArrayList();
- serverSegments.put(server, descriptors);
- }
-
- descriptors.add(segment.rhs);
}
return new LazySequence<T>(
@@ -242,8 +241,7 @@
);
if (strategy == null) {
return toolChest.mergeSequences(seq);
- }
- else {
+ } else {
return strategy.mergeSequences(seq);
}
}
@@ -356,7 +354,11 @@
);
}
- private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
+ private Cache.NamedKey computeSegmentCacheKey(
+ String segmentIdentifier,
+ SegmentDescriptor descriptor,
+ byte[] queryCacheKey
+ )
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
index dc1156f..c98091e 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
@@ -149,9 +149,8 @@
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
- final DataSegment segment = container.getSegment(inventoryKey);
- if (segment != null) {
+ if (container.getSegment(inventoryKey) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventoryKey,
diff --git a/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java b/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java
index bee025c..fcbdfc6 100644
--- a/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java
+++ b/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java
@@ -26,6 +26,7 @@
import com.metamx.druid.partition.ImmutablePartitionHolder;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.partition.PartitionHolder;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
@@ -78,7 +79,7 @@
this.versionComparator = versionComparator;
}
- public void add(Interval interval, VersionType version, PartitionChunk<ObjectType> object)
+ public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
try {
lock.writeLock().lock();
@@ -278,6 +279,13 @@
addIntervalToTimeline(interval, entry, timeline);
}
+ /**
+ *
+ * @param timeline
+ * @param key
+ * @param entry
+ * @return boolean flag indicating whether or not we inserted or discarded something
+ */
private boolean addAtKey(
NavigableMap<Interval, TimelineEntry> timeline,
Interval key,
@@ -292,7 +300,7 @@
return false;
}
- while (currKey != null && currKey.overlaps(entryInterval)) {
+ while (entryInterval != null && currKey != null && currKey.overlaps(entryInterval)) {
Interval nextKey = timeline.higherKey(currKey);
int versionCompare = versionComparator.compare(
@@ -311,7 +319,7 @@
if (entryInterval.getEnd().isAfter(currKey.getEnd())) {
entryInterval = new Interval(currKey.getEnd(), entryInterval.getEnd());
} else {
- entryInterval = null;
+ entryInterval = null; // discard this entry
}
}
} else if (versionCompare > 0) {
@@ -491,4 +499,9 @@
return partitionHolder;
}
}
+
+ public static void main(String[] args)
+ {
+ System.out.println(new Interval(new DateTime(), (DateTime) null));
+ }
}
diff --git a/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java b/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java
index 96d0fa0..80068a6 100644
--- a/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java
+++ b/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java
@@ -1068,6 +1068,27 @@
);
}
+ // |----3---||---1---|
+ // |---2---|
+ @Test
+ public void testOverlapCausesNullEntries() throws Exception
+ {
+ timeline = makeStringIntegerTimeline();
+
+ add("2011-01-01T12/2011-01-02", "3", 3);
+ add("2011-01-02/3011-01-03", "1", 1);
+ add("2011-01-01/2011-01-02", "2", 2);
+
+ assertValues(
+ Arrays.asList(
+ createExpected("2011-01-01/2011-01-01T12", "2", 2),
+ createExpected("2011-01-01T12/2011-01-02", "3", 3),
+ createExpected("2011-01-02/3011-01-03", "1", 1)
+ ),
+ timeline.lookup(new Interval("2011-01-01/3011-01-03"))
+ );
+ }
+
// 1|----| |----|
// 2|------| |------|
// 3|------------------|