Merge pull request #712 from metamx/indexing-extra-classpaths
Allow indexing tasks to specify extra classpaths.
diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java
index ac2a5f8..49a8fb1 100644
--- a/processing/src/main/java/io/druid/query/TimewarpOperator.java
+++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
-import com.google.common.collect.ImmutableMap;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
@@ -80,7 +79,7 @@
return new QueryRunner<T>()
{
@Override
- public Sequence<T> run(Query<T> query)
+ public Sequence<T> run(final Query<T> query)
{
final long offset = computeOffset(now);
@@ -103,12 +102,19 @@
Object value = res.getValue();
if (value instanceof TimeBoundaryResultValue) {
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
- value = new TimeBoundaryResultValue(
- ImmutableMap.of(
- TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset),
- TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now))
- )
- );
+
+ DateTime minTime = null;
+ try{
+ minTime = boundary.getMinTime();
+ } catch(IllegalArgumentException e) {}
+
+ final DateTime maxTime = boundary.getMaxTime();
+
+ return (T) ((TimeBoundaryQuery) query).buildResult(
+ new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
+ minTime != null ? minTime.minus(offset) : null,
+ maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null
+ ).iterator().next();
}
return (T) new Result(res.getTimestamp().minus(offset), value);
} else if (input instanceof MapBasedRow) {
diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index fdde44b..088ec0b 100644
--- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -145,9 +145,10 @@
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
- return ByteBuffer.allocate(2)
+ final byte[] cacheKey = query.getCacheKey();
+ return ByteBuffer.allocate(1 + cacheKey.length)
.put(TIMEBOUNDARY_QUERY)
- .put(query.getCacheKey())
+ .put(cacheKey)
.array();
}
diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
index 4dfb8bb..f1703e8 100644
--- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
+++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
@@ -26,6 +26,7 @@
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -122,5 +123,46 @@
Sequences.toList(queryRunner.run(query), Lists.<Result<TimeseriesResultValue>>newArrayList())
);
+
+ TimewarpOperator<Result<TimeBoundaryResultValue>> timeBoundaryOperator = new TimewarpOperator<>(
+ new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")),
+ new Period("P1W"),
+ new DateTime("2014-01-06") // align on Monday
+ );
+
+ QueryRunner<Result<TimeBoundaryResultValue>> timeBoundaryRunner = timeBoundaryOperator.postProcess(
+ new QueryRunner<Result<TimeBoundaryResultValue>>()
+ {
+ @Override
+ public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> query)
+ {
+ return Sequences.simple(
+ ImmutableList.of(
+ new Result<>(
+ new DateTime("2014-01-12"),
+ new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-01-12")))
+ )
+ )
+ );
+ }
+ },
+ new DateTime("2014-08-02").getMillis()
+ );
+
+ final Query<Result<TimeBoundaryResultValue>> timeBoundaryQuery =
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("dummy")
+ .build();
+
+ Assert.assertEquals(
+ Lists.newArrayList(
+ new Result<>(
+ new DateTime("2014-08-02"),
+ new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-08-02")))
+ )
+ ),
+ Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
+ );
+
}
}
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 8437261..f99f3d4 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -929,6 +929,48 @@
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10"))
);
+
+ testQueryCaching(
+ client,
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CachingClusteredClientTest.DATA_SOURCE)
+ .intervals(CachingClusteredClientTest.SEG_SPEC)
+ .context(CachingClusteredClientTest.CONTEXT)
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .build(),
+ new Interval("2011-01-01/2011-01-02"),
+ makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")),
+
+ new Interval("2011-01-01/2011-01-03"),
+ makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")),
+
+ new Interval("2011-01-01/2011-01-10"),
+ makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")),
+
+ new Interval("2011-01-01/2011-01-10"),
+ makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10"))
+ );
+
+ testQueryCaching(
+ client,
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CachingClusteredClientTest.DATA_SOURCE)
+ .intervals(CachingClusteredClientTest.SEG_SPEC)
+ .context(CachingClusteredClientTest.CONTEXT)
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .build(),
+ new Interval("2011-01-01/2011-01-02"),
+ makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null),
+
+ new Interval("2011-01-01/2011-01-03"),
+ makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null),
+
+ new Interval("2011-01-01/2011-01-10"),
+ makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null),
+
+ new Interval("2011-01-01/2011-01-10"),
+ makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null)
+ );
}
private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult(
@@ -937,17 +979,30 @@
DateTime maxTime
)
{
+ final Object value;
+ if (minTime != null && maxTime != null) {
+ value = ImmutableMap.of(
+ TimeBoundaryQuery.MIN_TIME,
+ minTime.toString(),
+ TimeBoundaryQuery.MAX_TIME,
+ maxTime.toString()
+ );
+ } else if (maxTime != null) {
+ value = ImmutableMap.of(
+ TimeBoundaryQuery.MAX_TIME,
+ maxTime.toString()
+ );
+ } else {
+ value = ImmutableMap.of(
+ TimeBoundaryQuery.MIN_TIME,
+ minTime.toString()
+ );
+ }
+
return Arrays.asList(
new Result<>(
timestamp,
- new TimeBoundaryResultValue(
- ImmutableMap.of(
- TimeBoundaryQuery.MIN_TIME,
- minTime.toString(),
- TimeBoundaryQuery.MAX_TIME,
- maxTime.toString()
- )
- )
+ new TimeBoundaryResultValue(value)
)
);
}