Merge pull request #584 from metamx/fix-segment-metadata-query

fix segment metadataquery
diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java
index 8767641..084652b3 100644
--- a/processing/src/main/java/io/druid/query/Druids.java
+++ b/processing/src/main/java/io/druid/query/Druids.java
@@ -29,6 +29,8 @@
 import io.druid.query.filter.NotDimFilter;
 import io.druid.query.filter.OrDimFilter;
 import io.druid.query.filter.SelectorDimFilter;
+import io.druid.query.metadata.metadata.ColumnIncluderator;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
 import io.druid.query.search.SearchResultValue;
 import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
 import io.druid.query.search.search.SearchQuery;
@@ -823,4 +825,112 @@
   {
     return new ResultBuilder<TimeBoundaryResultValue>();
   }
+
+  /**
+   * A Builder for SegmentMetadataQuery.
+   * <p/>
+   * Required: dataSource(), intervals() must be called before build()
+   * <p/>
+   * Usage example:
+   * <pre><code>
+   *   SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
+   *                                  .dataSource("Example")
+   *                                  .interval("2010/2013")
+   *                                  .build();
+   * </code></pre>
+   *
+   * @see io.druid.query.metadata.metadata.SegmentMetadataQuery
+   */
+  public static class SegmentMetadataQueryBuilder
+  {
+    private DataSource dataSource;
+    private QuerySegmentSpec querySegmentSpec;
+    private ColumnIncluderator toInclude;
+    private Boolean merge;
+    private Map<String, Object> context;
+
+    public SegmentMetadataQueryBuilder()
+    {
+      dataSource = null;
+      querySegmentSpec = null;
+      toInclude = null;
+      merge = null;
+      context = null;
+    }
+
+    public SegmentMetadataQuery build()
+    {
+      return new SegmentMetadataQuery(
+          dataSource,
+          querySegmentSpec,
+          toInclude,
+          merge,
+          context
+      );
+    }
+
+    public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
+    {
+      return new SegmentMetadataQueryBuilder()
+          .dataSource(builder.dataSource)
+          .intervals(builder.querySegmentSpec)
+          .toInclude(toInclude)
+          .merge(merge)
+          .context(builder.context);
+    }
+
+    public SegmentMetadataQueryBuilder dataSource(String ds)
+    {
+      dataSource = new TableDataSource(ds);
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder dataSource(DataSource ds)
+    {
+      dataSource = ds;
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder intervals(QuerySegmentSpec q)
+    {
+      querySegmentSpec = q;
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder intervals(String s)
+    {
+      querySegmentSpec = new LegacySegmentSpec(s);
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder intervals(List<Interval> l)
+    {
+      querySegmentSpec = new LegacySegmentSpec(l);
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder toInclude(ColumnIncluderator toInclude)
+    {
+      this.toInclude = toInclude;
+      return this;
+    }
+
+
+    public SegmentMetadataQueryBuilder merge(boolean merge)
+    {
+      this.merge = merge;
+      return this;
+    }
+
+    public SegmentMetadataQueryBuilder context(Map<String, Object> c)
+    {
+      context = c;
+      return this;
+    }
+  }
+
+  public static SegmentMetadataQueryBuilder newSegmentMetadataQueryBuilder()
+  {
+    return new SegmentMetadataQueryBuilder();
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index c5b64c2..9fa393c 100644
--- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -25,6 +25,7 @@
 import com.metamx.common.guava.ExecutorExecutingSequence;
 import com.metamx.common.guava.Sequence;
 import com.metamx.common.guava.Sequences;
+import io.druid.query.AbstractPrioritizedCallable;
 import io.druid.query.ConcatQueryRunner;
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
@@ -116,17 +117,14 @@
                   @Override
                   public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
                   {
-
+                    final int priority = query.getContextPriority(0);
                     Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
-                        new Callable<Sequence<SegmentAnalysis>>()
+                        new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
                         {
                           @Override
                           public Sequence<SegmentAnalysis> call() throws Exception
                           {
-                            return new ExecutorExecutingSequence<SegmentAnalysis>(
-                                input.run(query),
-                                queryExecutor
-                            );
+                            return input.run(query);
                           }
                         }
                     );
diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
index 6e6dd9c..5d59629 100644
--- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
+++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
@@ -21,17 +21,76 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.Druids;
 import io.druid.query.Query;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerFactory;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.ListColumnIncluderator;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
 import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.segment.QueryableIndexSegment;
+import io.druid.segment.TestIndex;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 public class SegmentMetadataQueryTest
 {
+  @SuppressWarnings("unchecked")
+  private final QueryRunner runner = makeQueryRunner(
+      new SegmentMetadataQueryRunnerFactory()
+  );
   private ObjectMapper mapper = new DefaultObjectMapper();
 
+  @SuppressWarnings("unchecked")
+  public static QueryRunner makeQueryRunner(
+      QueryRunnerFactory factory
+  )
+  {
+    return QueryRunnerTestHelper.makeQueryRunner(
+        factory,
+        new QueryableIndexSegment(QueryRunnerTestHelper.segmentId, TestIndex.getMMappedTestIndex())
+    );
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSegmentMetadataQuery()
+  {
+    SegmentMetadataQuery query = Druids.newSegmentMetadataQueryBuilder()
+                                       .dataSource("testing")
+                                       .intervals("2013/2014")
+                                       .toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
+                                       .merge(true)
+                                       .build();
+
+    Iterable<SegmentAnalysis> results = Sequences.toList(
+        runner.run(query),
+        Lists.<SegmentAnalysis>newArrayList()
+    );
+    SegmentAnalysis val = results.iterator().next();
+    Assert.assertEquals("testSegment", val.getId());
+    Assert.assertEquals(69843, val.getSize());
+    Assert.assertEquals(
+        Arrays.asList(new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")),
+        val.getIntervals()
+    );
+    Assert.assertEquals(1, val.getColumns().size());
+    final ColumnAnalysis columnAnalysis = val.getColumns().get("placement");
+    Assert.assertEquals("STRING", columnAnalysis.getType());
+    Assert.assertEquals(10881, columnAnalysis.getSize());
+    Assert.assertEquals(new Integer(1), columnAnalysis.getCardinality());
+    Assert.assertNull(columnAnalysis.getErrorMessage());
+
+  }
+
   @Test
   public void testSerde() throws Exception
   {