Merge pull request #712 from metamx/indexing-extra-classpaths

Allow indexing tasks to specify extra classpaths.
diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java
index ac2a5f8..49a8fb1 100644
--- a/processing/src/main/java/io/druid/query/TimewarpOperator.java
+++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableMap;
 import com.metamx.common.guava.Sequence;
 import com.metamx.common.guava.Sequences;
 import io.druid.data.input.MapBasedRow;
@@ -80,7 +79,7 @@
     return new QueryRunner<T>()
     {
       @Override
-      public Sequence<T> run(Query<T> query)
+      public Sequence<T> run(final Query<T> query)
       {
         final long offset = computeOffset(now);
 
@@ -103,12 +102,19 @@
                   Object value = res.getValue();
                   if (value instanceof TimeBoundaryResultValue) {
                     TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
-                    value = new TimeBoundaryResultValue(
-                        ImmutableMap.of(
-                            TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset),
-                            TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now))
-                        )
-                    );
+
+                    DateTime minTime = null;
+                    try{
+                      minTime = boundary.getMinTime();
+                    } catch(IllegalArgumentException e) {}
+
+                    final DateTime maxTime = boundary.getMaxTime();
+
+                    return (T) ((TimeBoundaryQuery) query).buildResult(
+                        new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
+                        minTime != null ? minTime.minus(offset) : null,
+                        maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null
+                    ).iterator().next();
                   }
                   return (T) new Result(res.getTimestamp().minus(offset), value);
                 } else if (input instanceof MapBasedRow) {
diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index fdde44b..088ec0b 100644
--- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -145,9 +145,10 @@
       @Override
       public byte[] computeCacheKey(TimeBoundaryQuery query)
       {
-        return ByteBuffer.allocate(2)
+        final byte[] cacheKey = query.getCacheKey();
+        return ByteBuffer.allocate(1 + cacheKey.length)
                          .put(TIMEBOUNDARY_QUERY)
-                         .put(query.getCacheKey())
+                         .put(cacheKey)
                          .array();
       }
 
diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
index 4dfb8bb..f1703e8 100644
--- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
+++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java
@@ -26,6 +26,7 @@
 import com.metamx.common.guava.Sequences;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
 import io.druid.query.timeseries.TimeseriesResultValue;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -122,5 +123,46 @@
         Sequences.toList(queryRunner.run(query), Lists.<Result<TimeseriesResultValue>>newArrayList())
     );
 
+
+    TimewarpOperator<Result<TimeBoundaryResultValue>> timeBoundaryOperator = new TimewarpOperator<>(
+        new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")),
+        new Period("P1W"),
+        new DateTime("2014-01-06") // align on Monday
+    );
+
+    QueryRunner<Result<TimeBoundaryResultValue>> timeBoundaryRunner = timeBoundaryOperator.postProcess(
+        new QueryRunner<Result<TimeBoundaryResultValue>>()
+        {
+          @Override
+          public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> query)
+          {
+            return Sequences.simple(
+                ImmutableList.of(
+                    new Result<>(
+                        new DateTime("2014-01-12"),
+                        new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-01-12")))
+                    )
+                )
+            );
+          }
+        },
+        new DateTime("2014-08-02").getMillis()
+    );
+
+    final Query<Result<TimeBoundaryResultValue>> timeBoundaryQuery =
+        Druids.newTimeBoundaryQueryBuilder()
+              .dataSource("dummy")
+              .build();
+
+    Assert.assertEquals(
+        Lists.newArrayList(
+            new Result<>(
+                new DateTime("2014-08-02"),
+                new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-08-02")))
+            )
+        ),
+        Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
+    );
+
   }
 }
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 8437261..f99f3d4 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -929,6 +929,48 @@
         new Interval("2011-01-01/2011-01-10"),
         makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10"))
     );
+
+    testQueryCaching(
+        client,
+        Druids.newTimeBoundaryQueryBuilder()
+              .dataSource(CachingClusteredClientTest.DATA_SOURCE)
+              .intervals(CachingClusteredClientTest.SEG_SPEC)
+              .context(CachingClusteredClientTest.CONTEXT)
+              .bound(TimeBoundaryQuery.MAX_TIME)
+              .build(),
+        new Interval("2011-01-01/2011-01-02"),
+        makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")),
+
+        new Interval("2011-01-01/2011-01-03"),
+        makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")),
+
+        new Interval("2011-01-01/2011-01-10"),
+        makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")),
+
+        new Interval("2011-01-01/2011-01-10"),
+        makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10"))
+    );
+
+    testQueryCaching(
+        client,
+        Druids.newTimeBoundaryQueryBuilder()
+              .dataSource(CachingClusteredClientTest.DATA_SOURCE)
+              .intervals(CachingClusteredClientTest.SEG_SPEC)
+              .context(CachingClusteredClientTest.CONTEXT)
+              .bound(TimeBoundaryQuery.MIN_TIME)
+              .build(),
+        new Interval("2011-01-01/2011-01-02"),
+        makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null),
+
+        new Interval("2011-01-01/2011-01-03"),
+        makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null),
+
+        new Interval("2011-01-01/2011-01-10"),
+        makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null),
+
+        new Interval("2011-01-01/2011-01-10"),
+        makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null)
+    );
   }
 
   private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult(
@@ -937,17 +979,30 @@
       DateTime maxTime
   )
   {
+    final Object value;
+    if (minTime != null && maxTime != null) {
+      value = ImmutableMap.of(
+          TimeBoundaryQuery.MIN_TIME,
+          minTime.toString(),
+          TimeBoundaryQuery.MAX_TIME,
+          maxTime.toString()
+      );
+    } else if (maxTime != null) {
+      value = ImmutableMap.of(
+          TimeBoundaryQuery.MAX_TIME,
+          maxTime.toString()
+      );
+    } else {
+      value = ImmutableMap.of(
+          TimeBoundaryQuery.MIN_TIME,
+          minTime.toString()
+      );
+    }
+
     return Arrays.asList(
         new Result<>(
             timestamp,
-            new TimeBoundaryResultValue(
-                ImmutableMap.of(
-                    TimeBoundaryQuery.MIN_TIME,
-                    minTime.toString(),
-                    TimeBoundaryQuery.MAX_TIME,
-                    maxTime.toString()
-                )
-            )
+            new TimeBoundaryResultValue(value)
         )
     );
   }