Merge pull request #70 from metamx/timezone-tests

More time zone tests
diff --git a/client/src/main/java/com/metamx/druid/BaseQuery.java b/client/src/main/java/com/metamx/druid/BaseQuery.java
index 4538467..76448ed 100644
--- a/client/src/main/java/com/metamx/druid/BaseQuery.java
+++ b/client/src/main/java/com/metamx/druid/BaseQuery.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.metamx.common.guava.Sequence;
+import com.metamx.druid.query.QueryRunner;
 import com.metamx.druid.query.segment.QuerySegmentSpec;
 import com.metamx.druid.query.segment.QuerySegmentWalker;
 import org.codehaus.jackson.annotate.JsonProperty;
@@ -72,7 +73,12 @@
   @Override
   public Sequence<T> run(QuerySegmentWalker walker)
   {
-    return querySegmentSpec.lookup(this, walker).run(this);
+    return run(querySegmentSpec.lookup(this, walker));
+  }
+
+  public Sequence<T> run(QueryRunner<T> runner)
+  {
+    return runner.run(this);
   }
 
   @Override
diff --git a/client/src/main/java/com/metamx/druid/Query.java b/client/src/main/java/com/metamx/druid/Query.java
index bd1dc49..4c4e7f7 100644
--- a/client/src/main/java/com/metamx/druid/Query.java
+++ b/client/src/main/java/com/metamx/druid/Query.java
@@ -20,6 +20,7 @@
 package com.metamx.druid;
 
 import com.metamx.common.guava.Sequence;
+import com.metamx.druid.query.QueryRunner;
 import com.metamx.druid.query.group.GroupByQuery;
 import com.metamx.druid.query.metadata.SegmentMetadataQuery;
 import com.metamx.druid.query.search.SearchQuery;
@@ -57,6 +58,8 @@
 
   public Sequence<T> run(QuerySegmentWalker walker);
 
+  public Sequence<T> run(QueryRunner<T> runner);
+
   public List<Interval> getIntervals();
 
   public Duration getDuration();
diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
index 5d640dc..163f198 100644
--- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
+++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
@@ -54,6 +54,7 @@
 import com.metamx.druid.result.BySegmentResultValueClass;
 import com.metamx.druid.result.Result;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -110,7 +111,7 @@
   public Sequence<T> run(final Query<T> query)
   {
     final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
-    final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query);
+    final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
 
     final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
 
@@ -241,6 +242,7 @@
             }
 
             final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
+            final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
             for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
               final byte[] cachedResult = cachedResultPair.rhs;
               Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>(
@@ -255,7 +257,8 @@
                         }
 
                         return objectMapper.readValues(
-                            objectMapper.getJsonFactory().createJsonParser(cachedResult), Object.class
+                            objectMapper.getJsonFactory().createJsonParser(cachedResult),
+                            cacheObjectClazz
                         );
                       }
                       catch (IOException e) {
diff --git a/client/src/main/java/com/metamx/druid/query/CacheStrategy.java b/client/src/main/java/com/metamx/druid/query/CacheStrategy.java
index abdbe4d..f8f5098 100644
--- a/client/src/main/java/com/metamx/druid/query/CacheStrategy.java
+++ b/client/src/main/java/com/metamx/druid/query/CacheStrategy.java
@@ -22,16 +22,19 @@
 import com.google.common.base.Function;
 import com.metamx.common.guava.Sequence;
 import com.metamx.druid.Query;
+import org.codehaus.jackson.type.TypeReference;
 
 /**
 */
-public interface CacheStrategy<T, QueryType extends Query<T>>
+public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
 {
   public byte[] computeCacheKey(QueryType query);
 
-  public Function<T, Object> prepareForCache();
+  public TypeReference<CacheType> getCacheObjectClazz();
 
-  public Function<Object, T> pullFromCache();
+  public Function<T, CacheType> prepareForCache();
+
+  public Function<CacheType, T> pullFromCache();
 
   public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
 }
diff --git a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
index ebf77f6..bec2170 100644
--- a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java
@@ -44,7 +44,7 @@
   public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
   public Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
   public TypeReference<ResultType> getResultTypeReference();
-  public CacheStrategy<ResultType, QueryType> getCacheStrategy(QueryType query);
+  public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query);
   public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner);
   public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner);
 }
diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
index 6ce071f..9dcf611 100644
--- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java
@@ -178,7 +178,7 @@
   }
 
   @Override
-  public CacheStrategy<Row, GroupByQuery> getCacheStrategy(GroupByQuery query)
+  public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
   {
     return null;
   }
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java
new file mode 100644
index 0000000..cd96b5d
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java
@@ -0,0 +1,37 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+/**
+ */
+public class AllColumnIncluderator implements ColumnIncluderator
+{
+  @Override
+  public boolean include(String columnName)
+  {
+    return true;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return ALL_CACHE_PREFIX;
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java b/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java
new file mode 100644
index 0000000..33b894c
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java
@@ -0,0 +1,119 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import com.google.common.base.Preconditions;
+import com.metamx.druid.index.column.ValueType;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*/
+public class ColumnAnalysis
+{
+  private static final String ERROR_PREFIX = "error:";
+
+  public static ColumnAnalysis error(String reason)
+  {
+    return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null);
+  }
+
+  private final String type;
+  private final long size;
+  private final Integer cardinality;
+
+  @JsonCreator
+  public ColumnAnalysis(
+      @JsonProperty("type") ValueType type,
+      @JsonProperty("size") long size,
+      @JsonProperty("cardinality") Integer cardinality
+  )
+  {
+    this(type.name(), size, cardinality);
+  }
+
+  private ColumnAnalysis(
+      String type,
+      long size,
+      Integer cardinality
+  )
+  {
+    this.type = type;
+    this.size = size;
+    this.cardinality = cardinality;
+  }
+
+  @JsonProperty
+  public String getType()
+  {
+    return type;
+  }
+
+  @JsonProperty
+  public long getSize()
+  {
+    return size;
+  }
+
+  @JsonProperty
+  public Integer getCardinality()
+  {
+    return cardinality;
+  }
+
+  public boolean isError()
+  {
+    return type.startsWith(ERROR_PREFIX);
+  }
+
+  public ColumnAnalysis fold(ColumnAnalysis rhs)
+  {
+    if (rhs == null) {
+      return this;
+    }
+
+    if (!type.equals(rhs.getType())) {
+      return ColumnAnalysis.error("cannot_merge_diff_types");
+    }
+
+    Integer cardinality = getCardinality();
+    final Integer rhsCardinality = rhs.getCardinality();
+    if (cardinality == null) {
+      cardinality = rhsCardinality;
+    }
+    else {
+      if (rhsCardinality != null) {
+        cardinality = Math.max(cardinality, rhsCardinality);
+      }
+    }
+
+    return new ColumnAnalysis(type, size + rhs.getSize(), cardinality);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ColumnAnalysis{" +
+           "type='" + type + '\'' +
+           ", size=" + size +
+           ", cardinality=" + cardinality +
+           '}';
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java
new file mode 100644
index 0000000..90533c4
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java
@@ -0,0 +1,41 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+/**
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "none", value= NoneColumnIncluderator.class),
+    @JsonSubTypes.Type(name = "all", value= AllColumnIncluderator.class),
+    @JsonSubTypes.Type(name = "list", value= ListColumnIncluderator.class)
+})
+public interface ColumnIncluderator
+{
+  public static final byte[] NONE_CACHE_PREFIX = new byte[]{0x0};
+  public static final byte[] ALL_CACHE_PREFIX = new byte[]{0x1};
+  public static final byte[] LIST_CACHE_PREFIX = new byte[]{0x2};
+
+  public boolean include(String columnName);
+  public byte[] getCacheKey();
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java
new file mode 100644
index 0000000..e74661d
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java
@@ -0,0 +1,82 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class ListColumnIncluderator implements ColumnIncluderator
+{
+  private final Set<String> columns;
+
+  @JsonCreator
+  public ListColumnIncluderator(
+      @JsonProperty("columns") List<String> columns
+  )
+  {
+    this.columns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+    this.columns.addAll(columns);
+  }
+
+  @JsonProperty
+  public Set<String> getColumns()
+  {
+    return Collections.unmodifiableSet(columns);
+  }
+
+  @Override
+  public boolean include(String columnName)
+  {
+    return columns.contains(columnName);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    int size = 1;
+    List<byte[]> columns = Lists.newArrayListWithExpectedSize(this.columns.size());
+
+    for (String column : this.columns) {
+      final byte[] bytes = column.getBytes(Charsets.UTF_8);
+      columns.add(bytes);
+      size += bytes.length;
+    }
+
+    final ByteBuffer bytes = ByteBuffer.allocate(size).put(LIST_CACHE_PREFIX);
+    for (byte[] column : columns) {
+      bytes.put(column);
+    }
+
+    return bytes.array();
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java
new file mode 100644
index 0000000..d1d66d2
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java
@@ -0,0 +1,37 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+/**
+ */
+public class NoneColumnIncluderator implements ColumnIncluderator
+{
+  @Override
+  public boolean include(String columnName)
+  {
+    return false;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return NONE_CACHE_PREFIX;
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentAnalysis.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentAnalysis.java
new file mode 100644
index 0000000..1182bfe
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentAnalysis.java
@@ -0,0 +1,94 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Map;
+
+public class SegmentAnalysis
+{
+  private final String id;
+  private final List<Interval> interval;
+  private final Map<String, ColumnAnalysis> columns;
+  private final long size;
+
+  @JsonCreator
+  public SegmentAnalysis(
+      @JsonProperty("id") String id,
+      @JsonProperty("intervals") List<Interval> interval,
+      @JsonProperty("columns") Map<String, ColumnAnalysis> columns,
+      @JsonProperty("size") long size
+
+  )
+  {
+    this.id = id;
+    this.interval = interval;
+    this.columns = columns;
+    this.size = size;
+  }
+
+  @JsonProperty
+  public String getId()
+  {
+    return id;
+  }
+
+  @JsonProperty
+  public List<Interval> getIntervals()
+  {
+    return interval;
+  }
+
+  @JsonProperty
+  public Map<String, ColumnAnalysis> getColumns()
+  {
+    return columns;
+  }
+
+  @JsonProperty
+  public long getSize()
+  {
+    return size;
+  }
+
+  public String toDetailedString()
+  {
+    return "SegmentAnalysis{" +
+           "id='" + id + '\'' +
+           ", interval=" + interval +
+           ", columns=" + columns +
+           ", size=" + size +
+           '}';
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SegmentAnalysis{" +
+           "id='" + id + '\'' +
+           ", interval=" + interval +
+           ", size=" + size +
+           '}';
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQuery.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQuery.java
index e72b85a..7e0d04c 100644
--- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQuery.java
+++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQuery.java
@@ -22,26 +22,40 @@
 import com.metamx.druid.BaseQuery;
 import com.metamx.druid.Query;
 import com.metamx.druid.query.segment.QuerySegmentSpec;
-import com.metamx.druid.result.Result;
-import com.metamx.druid.result.SegmentMetadataResultValue;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 import java.util.Map;
 
-public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResultValue>>
+public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
 {
 
+  private final ColumnIncluderator toInclude;
+  private final boolean merge;
+
   public SegmentMetadataQuery(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
+      @JsonProperty("toInclude") ColumnIncluderator toInclude,
+      @JsonProperty("merge") Boolean merge,
       @JsonProperty("context") Map<String, String> context
   )
   {
-    super(
-        dataSource,
-        querySegmentSpec,
-        context
-    );
+    super(dataSource, querySegmentSpec, context);
+
+    this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
+    this.merge = merge == null ? false : merge;
+  }
+
+  @JsonProperty
+  public ColumnIncluderator getToInclude()
+  {
+    return toInclude;
+  }
+
+  @JsonProperty
+  public boolean isMerge()
+  {
+    return merge;
   }
 
   @Override
@@ -57,22 +71,16 @@
   }
 
   @Override
-  public Query<Result<SegmentMetadataResultValue>> withOverriddenContext(Map<String, String> contextOverride)
+  public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
   {
     return new SegmentMetadataQuery(
-        getDataSource(),
-        getQuerySegmentSpec(),
-        computeOverridenContext(contextOverride)
+        getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
     );
   }
 
   @Override
-  public Query<Result<SegmentMetadataResultValue>> withQuerySegmentSpec(QuerySegmentSpec spec)
+  public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
   {
-    return new SegmentMetadataQuery(
-        getDataSource(),
-        spec,
-        getContext()
-    );
+    return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext());
   }
 }
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index 14ef61b..160c23c 100644
--- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -22,32 +22,116 @@
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.metamx.common.guava.ConcatSequence;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.metamx.common.ISE;
+import com.metamx.common.guava.MergeSequence;
 import com.metamx.common.guava.Sequence;
-import com.metamx.common.guava.Sequences;
+import com.metamx.common.guava.nary.BinaryFn;
+import com.metamx.druid.Query;
+import com.metamx.druid.collect.OrderedMergeSequence;
 import com.metamx.druid.query.CacheStrategy;
-import com.metamx.druid.query.ConcatQueryRunner;
 import com.metamx.druid.query.MetricManipulationFn;
 import com.metamx.druid.query.QueryRunner;
 import com.metamx.druid.query.QueryToolChest;
-import com.metamx.druid.result.Result;
-import com.metamx.druid.result.SegmentMetadataResultValue;
+import com.metamx.druid.query.ResultMergeQueryRunner;
+import com.metamx.druid.utils.JodaUtils;
 import com.metamx.emitter.service.ServiceMetricEvent;
 import org.codehaus.jackson.type.TypeReference;
 import org.joda.time.Interval;
 import org.joda.time.Minutes;
 
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
+
+public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
 {
-
-  private static final TypeReference<Result<SegmentMetadataResultValue>> TYPE_REFERENCE = new TypeReference<Result<SegmentMetadataResultValue>>(){};
+  private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
+  private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
 
   @Override
-  public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(final QueryRunner<Result<SegmentMetadataResultValue>> runner)
+  public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
   {
-    return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
+    return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
+    {
+      @Override
+      protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
+      {
+        if (((SegmentMetadataQuery) query).isMerge()) {
+          // Merge everything always
+          return new Ordering<SegmentAnalysis>()
+          {
+            @Override
+            public int compare(
+                @Nullable SegmentAnalysis left, @Nullable SegmentAnalysis right
+            )
+            {
+              return 0;
+            }
+          };
+        }
+
+        return getOrdering(); // No two elements should be equal, so it should never merge
+      }
+
+      @Override
+      protected BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final Query<SegmentAnalysis> inQ)
+      {
+        return new BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis>()
+        {
+          private final SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
+
+          @Override
+          public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
+          {
+            if (arg1 == null) {
+              return arg2;
+            }
+
+            if (arg2 == null) {
+              return arg1;
+            }
+
+            if (!query.isMerge()) {
+              throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
+            }
+
+            List<Interval> newIntervals = JodaUtils.condenseIntervals(
+                Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
+            );
+
+            final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
+            final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
+            Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
+
+            Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
+            for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
+              final String columnName = entry.getKey();
+              columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
+              rightColumnNames.remove(columnName);
+            }
+
+            for (String columnName : rightColumnNames) {
+              columns.put(columnName, rightColumns.get(columnName));
+            }
+
+            return new SegmentAnalysis("merged", newIntervals, columns, arg1.getSize() + arg2.getSize());
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
+  {
+    return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
   }
 
   @Override
@@ -67,13 +151,7 @@
   }
 
   @Override
-  public Sequence<Result<SegmentMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<SegmentMetadataResultValue>>> seqOfSequences)
-  {
-    return new ConcatSequence<Result<SegmentMetadataResultValue>>(seqOfSequences);
-  }
-
-  @Override
-  public Function<Result<SegmentMetadataResultValue>, Result<SegmentMetadataResultValue>> makeMetricManipulatorFn(
+  public Function<SegmentAnalysis, SegmentAnalysis> makeMetricManipulatorFn(
       SegmentMetadataQuery query, MetricManipulationFn fn
   )
   {
@@ -81,26 +159,87 @@
   }
 
   @Override
-  public TypeReference<Result<SegmentMetadataResultValue>> getResultTypeReference()
+  public TypeReference<SegmentAnalysis> getResultTypeReference()
   {
     return TYPE_REFERENCE;
   }
 
   @Override
-  public CacheStrategy<Result<SegmentMetadataResultValue>, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
+  public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
   {
-    return null;
+    return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
+    {
+      @Override
+      public byte[] computeCacheKey(SegmentMetadataQuery query)
+      {
+        byte[] includerBytes = query.getToInclude().getCacheKey();
+        return ByteBuffer.allocate(1 + includerBytes.length)
+                         .put(SEGMENT_METADATA_CACHE_PREFIX)
+                         .put(includerBytes)
+                         .array();
+      }
+
+      @Override
+      public TypeReference<SegmentAnalysis> getCacheObjectClazz()
+      {
+        return getResultTypeReference();
+      }
+
+      @Override
+      public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
+      {
+        return new Function<SegmentAnalysis, SegmentAnalysis>()
+        {
+          @Override
+          public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
+          {
+            return input;
+          }
+        };
+      }
+
+      @Override
+      public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
+      {
+        return new Function<SegmentAnalysis, SegmentAnalysis>()
+        {
+          @Override
+          public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
+          {
+            return input;
+          }
+        };
+      }
+
+      @Override
+      public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
+      {
+        return new MergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
+      }
+    };
   }
 
   @Override
-  public QueryRunner<Result<SegmentMetadataResultValue>> preMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
+  public QueryRunner<SegmentAnalysis> preMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
   {
     return runner;
   }
 
   @Override
-  public QueryRunner<Result<SegmentMetadataResultValue>> postMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
+  public QueryRunner<SegmentAnalysis> postMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
   {
     return runner;
   }
+
+  private Ordering<SegmentAnalysis> getOrdering()
+  {
+    return new Ordering<SegmentAnalysis>()
+    {
+      @Override
+      public int compare(SegmentAnalysis left, SegmentAnalysis right)
+      {
+        return left.getId().compareTo(right.getId());
+      }
+    };
+  }
 }
diff --git a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
index 95757fc..ce3fcc8 100644
--- a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java
@@ -82,6 +82,10 @@
     maxSearchLimit = PropUtils.getPropertyAsInt(props, "com.metamx.query.search.maxSearchLimit", 1000);
   }
 
+  private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
+  {
+  };
+
   @Override
   public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner)
   {
@@ -143,9 +147,9 @@
   }
 
   @Override
-  public CacheStrategy<Result<SearchResultValue>, SearchQuery> getCacheStrategy(SearchQuery query)
+  public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStrategy(SearchQuery query)
   {
-    return new CacheStrategy<Result<SearchResultValue>, SearchQuery>()
+    return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
     {
       @Override
       public byte[] computeCacheKey(SearchQuery query)
@@ -184,6 +188,12 @@
       }
 
       @Override
+      public TypeReference<Object> getCacheObjectClazz()
+      {
+        return OBJECT_TYPE_REFERENCE;
+      }
+
+      @Override
       public Function<Result<SearchResultValue>, Object> prepareForCache()
       {
         return new Function<Result<SearchResultValue>, Object>()
diff --git a/client/src/main/java/com/metamx/druid/query/segment/QuerySegmentSpecs.java b/client/src/main/java/com/metamx/druid/query/segment/QuerySegmentSpecs.java
new file mode 100644
index 0000000..9516db4
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/segment/QuerySegmentSpecs.java
@@ -0,0 +1,45 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.segment;
+
+import org.joda.time.Interval;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class QuerySegmentSpecs
+{
+  public static QuerySegmentSpec create(String isoInterval)
+  {
+    return new LegacySegmentSpec(isoInterval);
+  }
+
+  public static QuerySegmentSpec create(Interval interval)
+  {
+    return create(Arrays.asList(interval));
+  }
+
+  public static QuerySegmentSpec create(List<Interval> intervals)
+  {
+    return new MultipleIntervalSegmentSpec(intervals);
+  }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 9d65ab9..5ee6c32 100644
--- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -53,6 +53,9 @@
   private static final TypeReference<Result<TimeBoundaryResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeBoundaryResultValue>>()
   {
   };
+  private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
+  {
+  };
 
   @Override
   public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults(
@@ -106,9 +109,9 @@
   }
 
   @Override
-  public CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
+  public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
   {
-    return new CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>()
+    return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
     {
       @Override
       public byte[] computeCacheKey(TimeBoundaryQuery query)
@@ -120,6 +123,12 @@
       }
 
       @Override
+      public TypeReference<Object> getCacheObjectClazz()
+      {
+        return OBJECT_TYPE_REFERENCE;
+      }
+
+      @Override
       public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
       {
         return new Function<Result<TimeBoundaryResultValue>, Object>()
diff --git a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index df619f3..9c63350 100644
--- a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -68,6 +68,9 @@
   private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeseriesResultValue>>()
   {
   };
+  private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
+  {
+  };
 
   @Override
   public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner)
@@ -102,10 +105,7 @@
   @Override
   public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
   {
-    return new OrderedMergeSequence<Result<TimeseriesResultValue>>(
-        getOrdering(),
-        seqOfSequences
-    );
+    return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
   }
 
   @Override
@@ -158,9 +158,9 @@
   }
 
   @Override
-  public CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
+  public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
   {
-    return new CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery>()
+    return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
     {
       private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
       private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@@ -183,6 +183,12 @@
       }
 
       @Override
+      public TypeReference<Object> getCacheObjectClazz()
+      {
+        return OBJECT_TYPE_REFERENCE;
+      }
+
+      @Override
       public Function<Result<TimeseriesResultValue>, Object> prepareForCache()
       {
         return new Function<Result<TimeseriesResultValue>, Object>()
@@ -262,6 +268,4 @@
   {
     return Ordering.natural();
   }
-
-
 }
diff --git a/client/src/main/java/com/metamx/druid/result/SegmentMetadataResultValue.java b/client/src/main/java/com/metamx/druid/result/SegmentMetadataResultValue.java
deleted file mode 100644
index 5904264..0000000
--- a/client/src/main/java/com/metamx/druid/result/SegmentMetadataResultValue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package com.metamx.druid.result;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import java.util.Map;
-
-public class SegmentMetadataResultValue
-{
-  public static class Dimension {
-    @JsonProperty public long size;
-    @JsonProperty public int cardinality;
-
-    @JsonCreator
-    public Dimension(
-        @JsonProperty("size") long size,
-        @JsonProperty("cardinality") int cardinality
-    )
-    {
-      this.size = size;
-      this.cardinality = cardinality;
-    }
-  }
-  public static class Metric {
-    @JsonProperty public String type;
-    @JsonProperty public long size;
-
-    @JsonCreator
-    public Metric(
-            @JsonProperty("type") String type,
-            @JsonProperty("size") long size
-    )
-    {
-      this.type = type;
-      this.size = size;
-    }
-  }
-
-  private final String id;
-  private final Map<String, Dimension> dimensions;
-  private final Map<String, Metric> metrics;
-  private final long size;
-
-  @JsonCreator
-  public SegmentMetadataResultValue(
-      @JsonProperty("id") String id,
-      @JsonProperty("dimensions") Map<String, Dimension> dimensions,
-      @JsonProperty("metrics") Map<String, Metric> metrics,
-      @JsonProperty("size") long size
-
-  )
-  {
-    this.id = id;
-    this.dimensions = dimensions;
-    this.metrics = metrics;
-    this.size = size;
-  }
-
-  @JsonProperty
-  public String getId()
-  {
-    return id;
-  }
-
-  @JsonProperty
-  public Map<String, Dimension> getDimensions()
-  {
-    return dimensions;
-  }
-
-  @JsonProperty
-  public Map<String, Metric> getMetrics()
-  {
-    return metrics;
-  }
-
-  @JsonProperty
-  public long getSize()
-  {
-    return size;
-  }
-}
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/BitmapIndex.java b/index-common/src/main/java/com/metamx/druid/index/column/BitmapIndex.java
index 884b126..6873292 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/BitmapIndex.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/BitmapIndex.java
@@ -25,5 +25,7 @@
  */
 public interface BitmapIndex
 {
+  public int getCardinality();
+  public String getValue(int index);
   public ImmutableConciseSet getConciseSet(String value);
 }
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/DictionaryEncodedColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/DictionaryEncodedColumn.java
index 9301734..62057cb 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/DictionaryEncodedColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/DictionaryEncodedColumn.java
@@ -25,7 +25,7 @@
  */
 public interface DictionaryEncodedColumn
 {
-  public int size();
+  public int length();
   public boolean hasMultipleValues();
   public int getSingleValueRow(int rowNum);
   public IndexedInts getMultiValueRow(int rowNum);
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/GenericColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/GenericColumn.java
index c41b490..530eb2f 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/GenericColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/GenericColumn.java
@@ -29,7 +29,7 @@
  */
 public interface GenericColumn extends Closeable
 {
-  public int size();
+  public int length();
   public ValueType getType();
   public boolean hasMultipleValues();
 
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/IndexedFloatsGenericColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/IndexedFloatsGenericColumn.java
index 423b046..5df2284 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/IndexedFloatsGenericColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/IndexedFloatsGenericColumn.java
@@ -38,7 +38,7 @@
   }
 
   @Override
-  public int size()
+  public int length()
   {
     return column.size();
   }
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/IndexedLongsGenericColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/IndexedLongsGenericColumn.java
index 0e96a63..211dab3 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/IndexedLongsGenericColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/IndexedLongsGenericColumn.java
@@ -38,7 +38,7 @@
   }
 
   @Override
-  public int size()
+  public int length()
   {
     return column.size();
   }
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
index a388420..93825a8 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/SimpleColumn.java
@@ -62,7 +62,7 @@
     GenericColumn column = null;
     try {
       column = genericColumn.get();
-      return column.size();
+      return column.length();
     }
     finally {
       Closeables.closeQuietly(column);
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/SimpleDictionaryEncodedColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/SimpleDictionaryEncodedColumn.java
index 7a28a53..fbc3877 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/SimpleDictionaryEncodedColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/SimpleDictionaryEncodedColumn.java
@@ -44,7 +44,7 @@
   }
 
   @Override
-  public int size()
+  public int length()
   {
     return hasMultipleValues() ? multiValueColumn.size() : column.size();
   }
diff --git a/index-common/src/main/java/com/metamx/druid/index/column/StringMultiValueColumn.java b/index-common/src/main/java/com/metamx/druid/index/column/StringMultiValueColumn.java
index 053bcee..79327cb 100644
--- a/index-common/src/main/java/com/metamx/druid/index/column/StringMultiValueColumn.java
+++ b/index-common/src/main/java/com/metamx/druid/index/column/StringMultiValueColumn.java
@@ -67,7 +67,7 @@
     return new DictionaryEncodedColumn()
     {
       @Override
-      public int size()
+      public int length()
       {
         return column.size();
       }
diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/BitmapIndexColumnPartSupplier.java b/index-common/src/main/java/com/metamx/druid/index/serde/BitmapIndexColumnPartSupplier.java
index 6813541..2381639 100644
--- a/index-common/src/main/java/com/metamx/druid/index/serde/BitmapIndexColumnPartSupplier.java
+++ b/index-common/src/main/java/com/metamx/druid/index/serde/BitmapIndexColumnPartSupplier.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Supplier;
 import com.metamx.druid.index.column.BitmapIndex;
 import com.metamx.druid.kv.GenericIndexed;
+import com.metamx.druid.kv.Indexed;
 import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
 
 /**
@@ -47,6 +48,18 @@
     return new BitmapIndex()
     {
       @Override
+      public int getCardinality()
+      {
+        return dictionary.size();
+      }
+
+      @Override
+      public String getValue(int index)
+      {
+        return dictionary.get(index);
+      }
+
+      @Override
       public ImmutableConciseSet getConciseSet(String value)
       {
         final int index = dictionary.indexOf(value);
diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
index afedbb6..2ea6370 100644
--- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
+++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
@@ -649,8 +649,12 @@
       }
 
       Set<String> colSet = Sets.newTreeSet();
-      colSet.addAll(Lists.newArrayList(index.getAvailableDimensions()));
-      colSet.addAll(Lists.newArrayList(index.getAvailableMetrics()));
+      for (String dimension : index.getAvailableDimensions()) {
+        colSet.add(dimension.toLowerCase());
+      }
+      for (String metric : index.getAvailableMetrics()) {
+        colSet.add(metric.toLowerCase());
+      }
 
       String[] cols = colSet.toArray(new String[colSet.size()]);
 
diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricSerde.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricSerde.java
index 7473aae..e54fb1c 100644
--- a/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricSerde.java
+++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricSerde.java
@@ -19,6 +19,7 @@
 
 package com.metamx.druid.index.v1.serde;
 
+import com.google.common.base.Function;
 import com.metamx.druid.index.column.ColumnBuilder;
 import com.metamx.druid.index.serde.ColumnPartSerde;
 import com.metamx.druid.kv.ObjectStrategy;
@@ -27,10 +28,10 @@
 
 /**
  */
-public interface ComplexMetricSerde
+public abstract class ComplexMetricSerde
 {
-  public String getTypeName();
-  public ComplexMetricExtractor getExtractor();
+  public abstract String getTypeName();
+  public abstract ComplexMetricExtractor getExtractor();
 
   /**
    * Deserializes a ByteBuffer and adds it to the ColumnBuilder.  This method allows for the ComplexMetricSerde
@@ -42,7 +43,7 @@
    * @param buffer the buffer to deserialize
    * @return a ColumnPartSerde that can serialize out the object that was read from the buffer to the builder
    */
-  public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
+  public abstract ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
 
   /**
    * This is deprecated because its usage is going to be removed from the code.
@@ -55,5 +56,20 @@
    * @return an ObjectStrategy as used by GenericIndexed
    */
   @Deprecated
-  public ObjectStrategy getObjectStrategy();
+  public abstract ObjectStrategy getObjectStrategy();
+
+
+  /**
+   * Returns a function that can convert the Object provided by the ComplexColumn created through deserializeColumn
+   * into a number of expected input bytes to produce that object.
+   *
+   * This is used to approximate the size of the input data via the SegmentMetadataQuery and does not need to be
+   * overridden if you do not care about the query.
+   *
+   * @return A function that can compute the size of the complex object or null if you cannot/do not want to compute it
+   */
+  public Function<Object, Long> inputSizeFn()
+  {
+    return null;
+  }
 }
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java
index 02bd6ac..f4df5e0 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java
@@ -33,7 +33,6 @@
 import com.metamx.common.concurrent.ScheduledExecutors;
 import com.metamx.common.guava.FunctionalIterable;
 import com.metamx.druid.Query;
-import com.metamx.druid.StorageAdapter;
 import com.metamx.druid.client.DataSegment;
 import com.metamx.druid.client.DruidServer;
 import com.metamx.druid.client.ServerView;
@@ -44,9 +43,6 @@
 import com.metamx.druid.index.v1.IndexGranularity;
 import com.metamx.druid.index.v1.IndexIO;
 import com.metamx.druid.index.v1.IndexMerger;
-import com.metamx.druid.index.v1.MMappedIndex;
-import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
-import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
 import com.metamx.druid.loading.SegmentPusher;
 import com.metamx.druid.query.MetricsEmittingQueryRunner;
 import com.metamx.druid.query.QueryRunner;
diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
index ce4e5f4..3bde079 100644
--- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
+++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
@@ -29,7 +29,6 @@
 import com.metamx.druid.client.DataSegment;
 import com.metamx.druid.collect.CountingMap;
 import com.metamx.druid.index.Segment;
-import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
 import com.metamx.druid.loading.SegmentLoader;
 import com.metamx.druid.loading.StorageAdapterLoadingException;
 import com.metamx.druid.partition.PartitionChunk;
@@ -330,7 +329,7 @@
               }
             },
             new BySegmentQueryRunner<T>(
-                adapter.getSegmentIdentifier(),
+                adapter.getIdentifier(),
                 adapter.getDataInterval().getStart(),
                 factory.createRunner(adapter)
             )
diff --git a/server/src/main/java/com/metamx/druid/index/IncrementalIndexSegment.java b/server/src/main/java/com/metamx/druid/index/IncrementalIndexSegment.java
index 19b71c0..79c7dc0 100644
--- a/server/src/main/java/com/metamx/druid/index/IncrementalIndexSegment.java
+++ b/server/src/main/java/com/metamx/druid/index/IncrementalIndexSegment.java
@@ -38,7 +38,7 @@
   }
 
   @Override
-  public String getSegmentIdentifier()
+  public String getIdentifier()
   {
     throw new UnsupportedOperationException();
   }
diff --git a/server/src/main/java/com/metamx/druid/index/QueryableIndexSegment.java b/server/src/main/java/com/metamx/druid/index/QueryableIndexSegment.java
index 8df88fc..770eb78 100644
--- a/server/src/main/java/com/metamx/druid/index/QueryableIndexSegment.java
+++ b/server/src/main/java/com/metamx/druid/index/QueryableIndexSegment.java
@@ -37,7 +37,7 @@
   }
 
   @Override
-  public String getSegmentIdentifier()
+  public String getIdentifier()
   {
     return identifier;
   }
diff --git a/server/src/main/java/com/metamx/druid/index/Segment.java b/server/src/main/java/com/metamx/druid/index/Segment.java
index 7bf0acc..b2edda9 100644
--- a/server/src/main/java/com/metamx/druid/index/Segment.java
+++ b/server/src/main/java/com/metamx/druid/index/Segment.java
@@ -26,7 +26,7 @@
  */
 public interface Segment
 {
-  public String getSegmentIdentifier();
+  public String getIdentifier();
   public Interval getDataInterval();
   public QueryableIndex asQueryableIndex();
   public StorageAdapter asStorageAdapter();
diff --git a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexQueryableIndex.java b/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexQueryableIndex.java
deleted file mode 100644
index 3c9d62d..0000000
--- a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexQueryableIndex.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package com.metamx.druid.index.v1;
-
-import com.metamx.druid.index.QueryableIndex;
-import com.metamx.druid.index.column.Column;
-import com.metamx.druid.index.column.ComplexColumnImpl;
-import com.metamx.druid.index.column.FloatColumn;
-import com.metamx.druid.index.column.LongColumn;
-import com.metamx.druid.index.column.StringMultiValueColumn;
-import com.metamx.druid.kv.Indexed;
-import com.metamx.druid.kv.VSizeIndexed;
-import org.joda.time.Interval;
-
-/**
- */
-public class MMappedIndexQueryableIndex implements QueryableIndex
-{
-  private final MMappedIndex index;
-
-  public MMappedIndexQueryableIndex(
-      MMappedIndex index
-  )
-  {
-    this.index = index;
-  }
-
-  public MMappedIndex getIndex()
-  {
-    return index;
-  }
-
-  @Override
-  public Interval getDataInterval()
-  {
-    return index.getDataInterval();
-  }
-
-  @Override
-  public int getNumRows()
-  {
-    return index.getTimestamps().size();
-  }
-
-  @Override
-  public Indexed<String> getColumnNames()
-  {
-    return null;
-  }
-
-  @Override
-  public Indexed<String> getAvailableDimensions()
-  {
-    return index.getAvailableDimensions();
-  }
-
-  @Override
-  public Column getTimeColumn()
-  {
-    return new LongColumn(index.timestamps);
-  }
-
-  @Override
-  public Column getColumn(String columnName)
-  {
-    final MetricHolder metricHolder = index.getMetricHolder(columnName);
-    if (metricHolder == null) {
-      final VSizeIndexed dimColumn = index.getDimColumn(columnName);
-      if (dimColumn == null) {
-        return null;
-      }
-
-      return new StringMultiValueColumn(
-          index.getDimValueLookup(columnName),
-          dimColumn,
-          index.getInvertedIndexes().get(columnName)
-      );
-    }
-    else if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) {
-      return new FloatColumn(metricHolder.floatType);
-    }
-    else {
-      return new ComplexColumnImpl(metricHolder.getTypeName(), metricHolder.getComplexType());
-    }
-  }
-}
diff --git a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java
deleted file mode 100644
index 6cffaf1..0000000
--- a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java
+++ /dev/null
@@ -1,666 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package com.metamx.druid.index.v1;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.metamx.common.collect.MoreIterators;
-import com.metamx.common.guava.FunctionalIterable;
-import com.metamx.common.guava.FunctionalIterator;
-import com.metamx.druid.BaseStorageAdapter;
-import com.metamx.druid.Capabilities;
-import com.metamx.druid.QueryGranularity;
-import com.metamx.druid.index.brita.BitmapIndexSelector;
-import com.metamx.druid.index.brita.Filter;
-import com.metamx.druid.index.v1.processing.Cursor;
-import com.metamx.druid.index.v1.processing.DimensionSelector;
-import com.metamx.druid.index.v1.processing.Offset;
-import com.metamx.druid.kv.Indexed;
-import com.metamx.druid.kv.IndexedFloats;
-import com.metamx.druid.kv.IndexedInts;
-import com.metamx.druid.kv.IndexedLongs;
-import com.metamx.druid.processing.ComplexMetricSelector;
-import com.metamx.druid.processing.FloatMetricSelector;
-import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- */
-public class MMappedIndexStorageAdapter extends BaseStorageAdapter
-{
-  private final MMappedIndex index;
-
-  public MMappedIndexStorageAdapter(
-      MMappedIndex index
-  )
-  {
-    this.index = index;
-  }
-
-  public MMappedIndex getIndex()
-  {
-    return index;
-  }
-
-  @Override
-  public String getSegmentIdentifier()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Interval getInterval()
-  {
-    return index.getDataInterval();
-  }
-
-  @Override
-  public int getDimensionCardinality(String dimension)
-  {
-    final Indexed<String> dimValueLookup = index.getDimValueLookup(dimension.toLowerCase());
-    if (dimValueLookup == null) {
-      return 0;
-    }
-    return dimValueLookup.size();
-  }
-
-  @Override
-  public DateTime getMinTime()
-  {
-    final IndexedLongs timestamps = index.getReadOnlyTimestamps();
-    final DateTime retVal = new DateTime(timestamps.get(0));
-    Closeables.closeQuietly(timestamps);
-    return retVal;
-  }
-
-  @Override
-  public DateTime getMaxTime()
-  {
-    final IndexedLongs timestamps = index.getReadOnlyTimestamps();
-    final DateTime retVal = new DateTime(timestamps.get(timestamps.size() - 1));
-    Closeables.closeQuietly(timestamps);
-    return retVal;
-  }
-
-  @Override
-  public Capabilities getCapabilities()
-  {
-    return Capabilities.builder().dimensionValuesSorted(true).build();
-  }
-
-  @Override
-  public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
-  {
-    Interval actualInterval = interval;
-    if (!actualInterval.overlaps(index.dataInterval)) {
-      return ImmutableList.of();
-    }
-
-    if (actualInterval.getStart().isBefore(index.dataInterval.getStart())) {
-      actualInterval = actualInterval.withStart(index.dataInterval.getStart());
-    }
-    if (actualInterval.getEnd().isAfter(index.dataInterval.getEnd())) {
-      actualInterval = actualInterval.withEnd(index.dataInterval.getEnd());
-    }
-
-    final Iterable<Cursor> iterable;
-    if (filter == null) {
-      iterable = new NoFilterCursorIterable(index, actualInterval, gran);
-    } else {
-      Offset offset = new ConciseOffset(filter.goConcise(new MMappedBitmapIndexSelector(index)));
-
-      iterable = new CursorIterable(index, actualInterval, gran, offset);
-    }
-
-    return FunctionalIterable.create(iterable).keep(Functions.<Cursor>identity());
-  }
-
-  @Override
-  public Indexed<String> getAvailableDimensions()
-  {
-    return index.getAvailableDimensions();
-  }
-
-  @Override
-  public Indexed<String> getDimValueLookup(String dimension)
-  {
-    return index.getDimValueLookup(dimension.toLowerCase());
-  }
-
-  @Override
-  public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal)
-  {
-    return index.getInvertedIndex(dimension.toLowerCase(), dimVal);
-  }
-
-  @Override
-  public Offset getFilterOffset(Filter filter)
-  {
-    return new ConciseOffset(
-        filter.goConcise(
-            new MMappedBitmapIndexSelector(index)
-        )
-    );
-  }
-
-  private static class CursorIterable implements Iterable<Cursor>
-  {
-    private final MMappedIndex index;
-    private final Interval interval;
-    private final QueryGranularity gran;
-    private final Offset offset;
-
-    public CursorIterable(
-        MMappedIndex index,
-        Interval interval,
-        QueryGranularity gran,
-        Offset offset
-    )
-    {
-      this.index = index;
-      this.interval = interval;
-      this.gran = gran;
-      this.offset = offset;
-    }
-
-    @Override
-    public Iterator<Cursor> iterator()
-    {
-      final Offset baseOffset = offset.clone();
-
-      final Map<String, Object> metricHolderCache = Maps.newHashMap();
-      final IndexedLongs timestamps = index.getReadOnlyTimestamps();
-
-      final FunctionalIterator<Cursor> retVal = FunctionalIterator
-          .create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
-          .transform(
-              new Function<Long, Cursor>()
-              {
-
-                @Override
-                public Cursor apply(final Long input)
-                {
-                  final long timeStart = Math.max(interval.getStartMillis(), input);
-                  while (baseOffset.withinBounds()
-                         && timestamps.get(baseOffset.getOffset()) < timeStart) {
-                    baseOffset.increment();
-                  }
-
-                  final Offset offset = new TimestampCheckingOffset(
-                      baseOffset, timestamps, Math.min(interval.getEndMillis(), gran.next(timeStart))
-                  );
-
-                  return new Cursor()
-                  {
-                    private final Offset initOffset = offset.clone();
-                    private final DateTime myBucket = gran.toDateTime(input);
-                    private Offset cursorOffset = offset;
-
-                    @Override
-                    public DateTime getTime()
-                    {
-                      return myBucket;
-                    }
-
-                    @Override
-                    public void advance()
-                    {
-                      cursorOffset.increment();
-                    }
-
-                    @Override
-                    public boolean isDone()
-                    {
-                      return !cursorOffset.withinBounds();
-                    }
-
-                    @Override
-                    public void reset()
-                    {
-                      cursorOffset = initOffset.clone();
-                    }
-
-                    @Override
-                    public DimensionSelector makeDimensionSelector(String dimension)
-                    {
-                      final String dimensionName = dimension.toLowerCase();
-                      final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
-                      final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
-
-                      if (rowVals == null) {
-                        return null;
-                      }
-
-                      return new DimensionSelector()
-                      {
-                        @Override
-                        public IndexedInts getRow()
-                        {
-                          return rowVals.get(cursorOffset.getOffset());
-                        }
-
-                        @Override
-                        public int getValueCardinality()
-                        {
-                          return dimValueLookup.size();
-                        }
-
-                        @Override
-                        public String lookupName(int id)
-                        {
-                          final String retVal = dimValueLookup.get(id);
-                          return retVal == null ? "" : retVal;
-                        }
-
-                        @Override
-                        public int lookupId(String name)
-                        {
-                          return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
-                        }
-                      };
-                    }
-
-                    @Override
-                    public FloatMetricSelector makeFloatMetricSelector(String metric)
-                    {
-                      final String metricName = metric.toLowerCase();
-                      IndexedFloats cachedMetricVals = (IndexedFloats) metricHolderCache.get(metricName);
-
-                      if (cachedMetricVals == null) {
-                        MetricHolder holder = index.getMetricHolder(metricName);
-                        if (holder != null) {
-                          cachedMetricVals = holder.getFloatType();
-                          metricHolderCache.put(metricName, cachedMetricVals);
-                        }
-                      }
-
-                      if (cachedMetricVals == null) {
-                        return new FloatMetricSelector()
-                        {
-                          @Override
-                          public float get()
-                          {
-                            return 0.0f;
-                          }
-                        };
-                      }
-
-                      final IndexedFloats metricVals = cachedMetricVals;
-                      return new FloatMetricSelector()
-                      {
-                        @Override
-                        public float get()
-                        {
-                          return metricVals.get(cursorOffset.getOffset());
-                        }
-                      };
-                    }
-
-                    @Override
-                    public ComplexMetricSelector makeComplexMetricSelector(String metric)
-                    {
-                      final String metricName = metric.toLowerCase();
-                      Indexed cachedMetricVals = (Indexed) metricHolderCache.get(metricName);
-
-                      if (cachedMetricVals == null) {
-                        MetricHolder holder = index.getMetricHolder(metricName);
-                        if (holder != null) {
-                          cachedMetricVals = holder.getComplexType();
-                          metricHolderCache.put(metricName, cachedMetricVals);
-                        }
-                      }
-
-                      if (cachedMetricVals == null) {
-                        return null;
-                      }
-
-                      final Indexed metricVals = cachedMetricVals;
-                      return new ComplexMetricSelector()
-                      {
-                        @Override
-                        public Class classOfObject()
-                        {
-                          return metricVals.getClazz();
-                        }
-
-                        @Override
-                        public Object get()
-                        {
-                          return metricVals.get(cursorOffset.getOffset());
-                        }
-                      };
-                    }
-                  };
-                }
-              }
-          );
-
-      // This after call is not perfect, if there is an exception during processing, it will never get called,
-      // but it's better than nothing and doing this properly all the time requires a lot more fixerating
-      return MoreIterators.after(
-          retVal,
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              Closeables.closeQuietly(timestamps);
-              for (Object object : metricHolderCache.values()) {
-                if (object instanceof Closeable) {
-                  Closeables.closeQuietly((Closeable) object);
-                }
-              }
-            }
-          }
-      );
-    }
-  }
-
-  private static class TimestampCheckingOffset implements Offset
-  {
-    private final Offset baseOffset;
-    private final IndexedLongs timestamps;
-    private final long threshold;
-
-    public TimestampCheckingOffset(
-        Offset baseOffset,
-        IndexedLongs timestamps,
-        long threshold
-    )
-    {
-      this.baseOffset = baseOffset;
-      this.timestamps = timestamps;
-      this.threshold = threshold;
-    }
-
-    @Override
-    public int getOffset()
-    {
-      return baseOffset.getOffset();
-    }
-
-    @Override
-    public Offset clone()
-    {
-      return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold);
-    }
-
-    @Override
-    public boolean withinBounds()
-    {
-      return baseOffset.withinBounds() && timestamps.get(baseOffset.getOffset()) < threshold;
-    }
-
-    @Override
-    public void increment()
-    {
-      baseOffset.increment();
-    }
-  }
-
-  private static class NoFilterCursorIterable implements Iterable<Cursor>
-  {
-    private final MMappedIndex index;
-    private final Interval interval;
-    private final QueryGranularity gran;
-
-    public NoFilterCursorIterable(
-        MMappedIndex index,
-        Interval interval,
-        QueryGranularity gran
-    )
-    {
-      this.index = index;
-      this.interval = interval;
-      this.gran = gran;
-    }
-
-    /**
-     * This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the
-     * next Cursor is processed.  It is *not* safe to pass these cursors off to another thread for parallel processing
-     *
-     * @return
-     */
-    @Override
-    public Iterator<Cursor> iterator()
-    {
-      final Map<String, Object> metricCacheMap = Maps.newHashMap();
-      final IndexedLongs timestamps = index.getReadOnlyTimestamps();
-
-      final FunctionalIterator<Cursor> retVal = FunctionalIterator
-          .create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
-          .transform(
-              new Function<Long, Cursor>()
-              {
-                private int currRow = 0;
-
-                @Override
-                public Cursor apply(final Long input)
-                {
-                  final long timeStart = Math.max(interval.getStartMillis(), input);
-                  while (currRow < timestamps.size() && timestamps.get(currRow) < timeStart) {
-                    ++currRow;
-                  }
-
-                  return new Cursor()
-                  {
-                    private final DateTime myBucket = gran.toDateTime(input);
-                    private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis());
-                    private final int initRow = currRow;
-
-                    @Override
-                    public DateTime getTime()
-                    {
-                      return myBucket;
-                    }
-
-                    @Override
-                    public void advance()
-                    {
-                      ++currRow;
-                    }
-
-                    @Override
-                    public boolean isDone()
-                    {
-                      return currRow >= timestamps.size() || timestamps.get(currRow) >= nextBucket;
-                    }
-
-                    @Override
-                    public void reset()
-                    {
-                      currRow = initRow;
-                    }
-
-                    @Override
-                    public DimensionSelector makeDimensionSelector(final String dimension)
-                    {
-                      final String dimensionName = dimension.toLowerCase();
-                      final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
-                      final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
-
-                      if (rowVals == null) {
-                        return null;
-                      }
-
-                      return new DimensionSelector()
-                      {
-                        @Override
-                        public IndexedInts getRow()
-                        {
-                          return rowVals.get(currRow);
-                        }
-
-                        @Override
-                        public int getValueCardinality()
-                        {
-                          return dimValueLookup.size();
-                        }
-
-                        @Override
-                        public String lookupName(int id)
-                        {
-                          final String retVal = dimValueLookup.get(id);
-                          return retVal == null ? "" : retVal;
-                        }
-
-                        @Override
-                        public int lookupId(String name)
-                        {
-                          return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
-                        }
-                      };
-                    }
-
-                    @Override
-                    public FloatMetricSelector makeFloatMetricSelector(String metric)
-                    {
-                      final String metricName = metric.toLowerCase();
-                      IndexedFloats cachedMetricVals = (IndexedFloats) metricCacheMap.get(metricName);
-
-                      if (cachedMetricVals == null) {
-                        final MetricHolder metricHolder = index.getMetricHolder(metricName);
-                        if (metricHolder != null) {
-                          cachedMetricVals = metricHolder.getFloatType();
-                          if (cachedMetricVals != null) {
-                            metricCacheMap.put(metricName, cachedMetricVals);
-                          }
-                        }
-                      }
-
-                      if (cachedMetricVals == null) {
-                        return new FloatMetricSelector()
-                        {
-                          @Override
-                          public float get()
-                          {
-                            return 0.0f;
-                          }
-                        };
-                      }
-
-                      final IndexedFloats metricVals = cachedMetricVals;
-                      return new FloatMetricSelector()
-                      {
-                        @Override
-                        public float get()
-                        {
-                          return metricVals.get(currRow);
-                        }
-                      };
-                    }
-
-                    @Override
-                    public ComplexMetricSelector makeComplexMetricSelector(String metric)
-                    {
-                      final String metricName = metric.toLowerCase();
-                      Indexed cachedMetricVals = (Indexed) metricCacheMap.get(metricName);
-
-                      if (cachedMetricVals == null) {
-                        final MetricHolder metricHolder = index.getMetricHolder(metricName);
-
-                        if (metricHolder != null) {
-                          cachedMetricVals = metricHolder.getComplexType();
-                          if (cachedMetricVals != null) {
-                            metricCacheMap.put(metricName, cachedMetricVals);
-                          }
-                        }
-                      }
-
-                      if (cachedMetricVals == null) {
-                        return null;
-                      }
-
-                      final Indexed metricVals = cachedMetricVals;
-                      return new ComplexMetricSelector()
-                      {
-                        @Override
-                        public Class classOfObject()
-                        {
-                          return metricVals.getClazz();
-                        }
-
-                        @Override
-                        public Object get()
-                        {
-                          return metricVals.get(currRow);
-                        }
-                      };
-                    }
-                  };
-                }
-              }
-          );
-
-      return MoreIterators.after(
-          retVal,
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              Closeables.closeQuietly(timestamps);
-              for (Object object : metricCacheMap.values()) {
-                if (object instanceof Closeable) {
-                  Closeables.closeQuietly((Closeable) object);
-                }
-              }
-            }
-          }
-      );
-    }
-  }
-
-  private static class MMappedBitmapIndexSelector implements BitmapIndexSelector
-  {
-    private final MMappedIndex index;
-
-    public MMappedBitmapIndexSelector(final MMappedIndex index)
-    {
-      this.index = index;
-    }
-
-    @Override
-    public Indexed<String> getDimensionValues(String dimension)
-    {
-      return index.getDimValueLookup(dimension.toLowerCase());
-    }
-
-    @Override
-    public int getNumRows()
-    {
-      return index.getReadOnlyTimestamps().size();
-    }
-
-    @Override
-    public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value)
-    {
-      return index.getInvertedIndex(dimension.toLowerCase(), value);
-    }
-  }
-}
diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java
index 445dd93..9f159eb 100644
--- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java
+++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java
@@ -113,7 +113,7 @@
     GenericColumn column = null;
     try {
       column = index.getTimeColumn().getGenericColumn();
-      return new DateTime(column.getLongSingleValueRow(column.size() - 1));
+      return new DateTime(column.getLongSingleValueRow(column.length() - 1));
     }
     finally {
       Closeables.closeQuietly(column);
@@ -572,7 +572,7 @@
                 public Cursor apply(final Long input)
                 {
                   final long timeStart = Math.max(interval.getStartMillis(), input);
-                  while (currRow < timestamps.size() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
+                  while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
                     ++currRow;
                   }
 
@@ -597,7 +597,7 @@
                     @Override
                     public boolean isDone()
                     {
-                      return currRow >= timestamps.size() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
+                      return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
                     }
 
                     @Override
@@ -848,7 +848,7 @@
       GenericColumn column = null;
       try {
         column = index.getTimeColumn().getGenericColumn();
-        return column.size();
+        return column.length();
       }
       finally {
         Closeables.closeQuietly(column);
diff --git a/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java b/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java
new file mode 100644
index 0000000..a219dba
--- /dev/null
+++ b/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java
@@ -0,0 +1,160 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Floats;
+import com.google.common.primitives.Longs;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.index.QueryableIndex;
+import com.metamx.druid.index.column.BitmapIndex;
+import com.metamx.druid.index.column.Column;
+import com.metamx.druid.index.column.ColumnCapabilities;
+import com.metamx.druid.index.column.ComplexColumn;
+import com.metamx.druid.index.column.ValueType;
+import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
+import com.metamx.druid.index.v1.serde.ComplexMetrics;
+
+import java.util.Map;
+
+
+public class SegmentAnalyzer
+{
+  private static final Logger log = new Logger(SegmentAnalyzer.class);
+
+  /**
+   * This is based on the minimum size of a timestamp (POSIX seconds).  An ISO timestamp will actually be more like 24+
+   */
+  private static final int NUM_BYTES_IN_TIMESTAMP = 10;
+
+  /**
+   * This is based on assuming 6 units of precision, one decimal point and a single value left of the decimal
+   */
+  private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
+
+  public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
+  {
+    Preconditions.checkNotNull(index, "Index cannot be null");
+
+    Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
+
+    for (String columnName : index.getColumnNames()) {
+      final Column column = index.getColumn(columnName);
+      final ColumnCapabilities capabilities = column.getCapabilities();
+
+      final ColumnAnalysis analysis;
+      final ValueType type = capabilities.getType();
+      switch(type) {
+        case LONG:
+          analysis = analyzeLongColumn(column);
+          break;
+        case FLOAT:
+          analysis = analyzeFloatColumn(column);
+          break;
+        case STRING:
+          analysis = analyzeStringColumn(column);
+          break;
+        case COMPLEX:
+          analysis = analyzeComplexColumn(column);
+          break;
+        default:
+          log.warn("Unknown column type[%s].", type);
+          analysis = ColumnAnalysis.error(String.format("unknown_type_%s", type));
+      }
+
+      columns.put(columnName, analysis);
+    }
+
+    columns.put("__time", lengthBasedAnalysis(index.getTimeColumn(), NUM_BYTES_IN_TIMESTAMP));
+
+    return columns;
+  }
+
+  public ColumnAnalysis analyzeLongColumn(Column column)
+  {
+    return lengthBasedAnalysis(column, Longs.BYTES);
+  }
+
+  public ColumnAnalysis analyzeFloatColumn(Column column)
+  {
+    return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT);
+  }
+
+  private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes)
+  {
+    final ColumnCapabilities capabilities = column.getCapabilities();
+    if (capabilities.hasMultipleValues()) {
+      return ColumnAnalysis.error("multi_value");
+    }
+
+    return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null);
+  }
+
+  public ColumnAnalysis analyzeStringColumn(Column column)
+  {
+    final ColumnCapabilities capabilities = column.getCapabilities();
+
+    if (capabilities.hasBitmapIndexes()) {
+      final BitmapIndex bitmapIndex = column.getBitmapIndex();
+
+      int cardinality = bitmapIndex.getCardinality();
+      long size = 0;
+      for (int i = 0; i < cardinality; ++i) {
+        String value = bitmapIndex.getValue(i);
+
+        if (value != null) {
+          size += value.getBytes(Charsets.UTF_8).length * bitmapIndex.getConciseSet(value).size();
+        }
+      }
+
+      return new ColumnAnalysis(capabilities.getType(), size, cardinality);
+    }
+
+    return ColumnAnalysis.error("string_no_bitmap");
+  }
+
+  public ColumnAnalysis analyzeComplexColumn(Column column)
+  {
+    final ColumnCapabilities capabilities = column.getCapabilities();
+    final ComplexColumn complexColumn = column.getComplexColumn();
+
+    final String typeName = complexColumn.getTypeName();
+    final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
+    if (serde == null) {
+      return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
+    }
+
+    final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
+    if (inputSizeFn == null) {
+      return ColumnAnalysis.error("noSizeFn");
+    }
+
+    final int length = column.getLength();
+    long size = 0;
+    for (int i = 0; i < length; ++i) {
+      size += inputSizeFn.apply(complexColumn.getRowValue(i));
+    }
+
+    return new ColumnAnalysis(capabilities.getType(), size, null);
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java b/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java
deleted file mode 100644
index 7522b4b..0000000
--- a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package com.metamx.druid.query.metadata;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.metamx.common.IAE;
-import com.metamx.common.guava.Sequence;
-import com.metamx.common.guava.Sequences;
-import com.metamx.common.guava.SimpleSequence;
-import com.metamx.druid.BaseStorageAdapter;
-import com.metamx.druid.StorageAdapter;
-import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
-import com.metamx.druid.kv.Indexed;
-import com.metamx.druid.result.Result;
-import com.metamx.druid.result.SegmentMetadataResultValue;
-import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.List;
-
-
-
-public class SegmentMetadataQueryEngine
-{
-  public Sequence<Result<SegmentMetadataResultValue>> process(
-      final SegmentMetadataQuery query,
-      StorageAdapter storageAdapter
-  )
-  {
-    final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
-    if (intervals.size() != 1) {
-      throw new IAE("Should only have one interval, got[%s]", intervals);
-    }
-
-    if(!(storageAdapter instanceof SegmentIdAttachedStorageAdapter) ||
-       !(((SegmentIdAttachedStorageAdapter)storageAdapter).getDelegate() instanceof BaseStorageAdapter)) {
-      return Sequences.empty();
-    }
-
-    final BaseStorageAdapter adapter = (BaseStorageAdapter)
-        ((SegmentIdAttachedStorageAdapter) storageAdapter).getDelegate();
-
-    Function<String, SegmentMetadataResultValue.Dimension> sizeDimension = new Function<String, SegmentMetadataResultValue.Dimension>()
-    {
-      @Override
-      public SegmentMetadataResultValue.Dimension apply(@Nullable String input)
-      {
-        long size = 0;
-        final Indexed<String> lookup = adapter.getDimValueLookup(input);
-        for (String dimVal : lookup) {
-          ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
-          size += (dimVal == null) ? 0 : index.size() * Charsets.UTF_8.encode(dimVal).capacity();
-        }
-        return new SegmentMetadataResultValue.Dimension(
-            size,
-            adapter.getDimensionCardinality(input)
-        );
-      }
-    };
-
-    // TODO: add metric storage size
-
-    long totalSize = 0;
-
-    HashMap<String, SegmentMetadataResultValue.Dimension> dimensions = Maps.newHashMap();
-    for(String input : adapter.getAvailableDimensions()) {
-      SegmentMetadataResultValue.Dimension d = sizeDimension.apply(input);
-      dimensions.put(input, d);
-      totalSize += d.size;
-    }
-
-    return new SimpleSequence<Result<SegmentMetadataResultValue>>(
-        ImmutableList.of(
-            new Result<SegmentMetadataResultValue>(
-                adapter.getMinTime(),
-                new SegmentMetadataResultValue(
-                    storageAdapter.getSegmentIdentifier(),
-                    dimensions,
-                    ImmutableMap.<String, SegmentMetadataResultValue.Metric>of(),
-                    totalSize
-                )
-            )
-        )
-    );
-  }
-}
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index b616456..f44110f 100644
--- a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ b/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -21,83 +21,105 @@
 
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.metamx.common.ISE;
+import com.google.common.collect.Maps;
 import com.metamx.common.guava.ExecutorExecutingSequence;
 import com.metamx.common.guava.Sequence;
 import com.metamx.common.guava.Sequences;
 import com.metamx.druid.Query;
-import com.metamx.druid.StorageAdapter;
+import com.metamx.druid.index.QueryableIndex;
 import com.metamx.druid.index.Segment;
 import com.metamx.druid.query.ConcatQueryRunner;
 import com.metamx.druid.query.QueryRunner;
 import com.metamx.druid.query.QueryRunnerFactory;
 import com.metamx.druid.query.QueryToolChest;
-import com.metamx.druid.query.metadata.SegmentMetadataQuery;
-import com.metamx.druid.query.metadata.SegmentMetadataQueryEngine;
-import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
-import com.metamx.druid.result.SegmentMetadataResultValue;
-import com.metamx.druid.result.Result;
 
+import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
+public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
 {
-  private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest()
-  {
-    @Override
-    public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(QueryRunner<Result<SegmentMetadataResultValue>> runner)
-    {
-      return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
-    }
-  };
+  private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
 
+  private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
 
   @Override
-  public QueryRunner<Result<SegmentMetadataResultValue>> createRunner(final Segment adapter)
+  public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
   {
-    return new QueryRunner<Result<SegmentMetadataResultValue>>()
+    return new QueryRunner<SegmentAnalysis>()
     {
       @Override
-      public Sequence<Result<SegmentMetadataResultValue>> run(Query<Result<SegmentMetadataResultValue>> query)
+      public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ)
       {
-        if (!(query instanceof SegmentMetadataQuery)) {
-          throw new ISE("Got a [%s] which isn't a %s", query.getClass(), SegmentMetadataQuery.class);
+        SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
+
+        final QueryableIndex index = segment.asQueryableIndex();
+        if (index == null) {
+          return Sequences.empty();
         }
-        return new SegmentMetadataQueryEngine().process((SegmentMetadataQuery) query, adapter.asStorageAdapter());
+
+        final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);
+
+        // Initialize with the size of the whitespace, 1 byte per
+        long totalSize = analyzedColumns.size() * index.getNumRows();
+
+        Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
+        ColumnIncluderator includerator = query.getToInclude();
+        for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
+          final String columnName = entry.getKey();
+          final ColumnAnalysis column = entry.getValue();
+
+          if (!column.isError()) {
+            totalSize += column.getSize();
+          }
+          if (includerator.include(columnName)) {
+            columns.put(columnName, column);
+          }
+        }
+
+        return Sequences.simple(
+            Arrays.asList(
+                new SegmentAnalysis(
+                    segment.getIdentifier(),
+                    Arrays.asList(segment.getDataInterval()),
+                    columns,
+                    totalSize
+                )
+            )
+        );
       }
     };
   }
 
   @Override
-  public QueryRunner<Result<SegmentMetadataResultValue>> mergeRunners(
-      final ExecutorService queryExecutor, Iterable<QueryRunner<Result<SegmentMetadataResultValue>>> queryRunners
+  public QueryRunner<SegmentAnalysis> mergeRunners(
+      final ExecutorService queryExecutor, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
   )
   {
-    return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(
+    return new ConcatQueryRunner<SegmentAnalysis>(
             Sequences.map(
                 Sequences.simple(queryRunners),
-                new Function<QueryRunner<Result<SegmentMetadataResultValue>>, QueryRunner<Result<SegmentMetadataResultValue>>>()
+                new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
                 {
                   @Override
-                  public QueryRunner<Result<SegmentMetadataResultValue>> apply(final QueryRunner<Result<SegmentMetadataResultValue>> input)
+                  public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
                   {
-                    return new QueryRunner<Result<SegmentMetadataResultValue>>()
+                    return new QueryRunner<SegmentAnalysis>()
                     {
                       @Override
-                      public Sequence<Result<SegmentMetadataResultValue>> run(final Query<Result<SegmentMetadataResultValue>> query)
+                      public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
                       {
 
-                        Future<Sequence<Result<SegmentMetadataResultValue>>> future = queryExecutor.submit(
-                            new Callable<Sequence<Result<SegmentMetadataResultValue>>>()
+                        Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
+                            new Callable<Sequence<SegmentAnalysis>>()
                             {
                               @Override
-                              public Sequence<Result<SegmentMetadataResultValue>> call() throws Exception
+                              public Sequence<SegmentAnalysis> call() throws Exception
                               {
-                                return new ExecutorExecutingSequence<Result<SegmentMetadataResultValue>>(
+                                return new ExecutorExecutingSequence<SegmentAnalysis>(
                                     input.run(query),
                                     queryExecutor
                                 );
@@ -122,7 +144,7 @@
   }
 
   @Override
-  public QueryToolChest getToolchest()
+  public QueryToolChest<SegmentAnalysis, SegmentMetadataQuery> getToolchest()
   {
     return toolChest;
   }
diff --git a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java
index 3e3a36b..ed8912d 100644
--- a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java
+++ b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java
@@ -328,7 +328,7 @@
     }
 
     @Override
-    public String getSegmentIdentifier()
+    public String getIdentifier()
     {
       return version;
     }
@@ -421,7 +421,7 @@
     }
 
     @Override
-    public CacheStrategy<T, QueryType> getCacheStrategy(QueryType query)
+    public <Typer> CacheStrategy<T, Typer, QueryType> getCacheStrategy(QueryType query)
     {
       return null;
     }
diff --git a/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java b/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java
index dc1f640..29d784d 100644
--- a/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java
+++ b/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java
@@ -35,7 +35,7 @@
     return new Segment()
     {
       @Override
-      public String getSegmentIdentifier()
+      public String getIdentifier()
       {
         return segment.getIdentifier();
       }
diff --git a/server/src/test/java/com/metamx/druid/query/QueryRunnerTestHelper.java b/server/src/test/java/com/metamx/druid/query/QueryRunnerTestHelper.java
index de3734c..69e7418 100644
--- a/server/src/test/java/com/metamx/druid/query/QueryRunnerTestHelper.java
+++ b/server/src/test/java/com/metamx/druid/query/QueryRunnerTestHelper.java
@@ -22,7 +22,6 @@
 import com.google.common.collect.Lists;
 import com.metamx.druid.Query;
 import com.metamx.druid.QueryGranularity;
-import com.metamx.druid.StorageAdapter;
 import com.metamx.druid.aggregation.AggregatorFactory;
 import com.metamx.druid.aggregation.CountAggregatorFactory;
 import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
@@ -35,13 +34,6 @@
 import com.metamx.druid.index.QueryableIndexSegment;
 import com.metamx.druid.index.Segment;
 import com.metamx.druid.index.v1.IncrementalIndex;
-import com.metamx.druid.index.v1.IncrementalIndexStorageAdapter;
-import com.metamx.druid.index.v1.Index;
-import com.metamx.druid.index.v1.IndexStorageAdapter;
-import com.metamx.druid.index.v1.MMappedIndex;
-import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
-import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
-import com.metamx.druid.index.v1.QueryableIndexStorageAdapter;
 import com.metamx.druid.index.v1.TestIndex;
 import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
 import com.metamx.druid.query.segment.QuerySegmentSpec;
@@ -132,7 +124,7 @@
     );
   }
 
-  private static <T> QueryRunner<T> makeQueryRunner(
+  public static <T> QueryRunner<T> makeQueryRunner(
       QueryRunnerFactory<T, Query<T>> factory,
       Segment adapter
   )
diff --git a/server/src/test/java/com/metamx/druid/query/metadata/SegmentAnalyzerTest.java b/server/src/test/java/com/metamx/druid/query/metadata/SegmentAnalyzerTest.java
new file mode 100644
index 0000000..6805bb6
--- /dev/null
+++ b/server/src/test/java/com/metamx/druid/query/metadata/SegmentAnalyzerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.query.metadata;
+
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
+import com.metamx.druid.index.IncrementalIndexSegment;
+import com.metamx.druid.index.QueryableIndexSegment;
+import com.metamx.druid.index.Segment;
+import com.metamx.druid.index.column.ValueType;
+import com.metamx.druid.index.v1.TestIndex;
+import com.metamx.druid.query.QueryRunner;
+import com.metamx.druid.query.QueryRunnerFactory;
+import com.metamx.druid.query.QueryRunnerTestHelper;
+import com.metamx.druid.query.segment.QuerySegmentSpecs;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SegmentAnalyzerTest
+{
+  @Test
+  public void testIncrementalDoesNotWork() throws Exception
+  {
+    final List<SegmentAnalysis> results = getSegmentAnalysises(
+        new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex())
+    );
+
+    Assert.assertEquals(0, results.size());
+  }
+
+  @Test
+  public void testMappedWorks() throws Exception
+  {
+    final List<SegmentAnalysis> results = getSegmentAnalysises(
+        new QueryableIndexSegment("test_1", TestIndex.getMMappedTestIndex())
+    );
+
+    Assert.assertEquals(1, results.size());
+
+    final SegmentAnalysis analysis = results.get(0);
+    Assert.assertEquals("test_1", analysis.getId());
+
+    final Map<String,ColumnAnalysis> columns = analysis.getColumns();
+    Assert.assertEquals(TestIndex.COLUMNS.length, columns.size()); // All columns including time
+
+    for (String dimension : TestIndex.DIMENSIONS) {
+      final ColumnAnalysis columnAnalysis = columns.get(dimension.toLowerCase());
+
+      Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
+      Assert.assertTrue(dimension, columnAnalysis.getSize() > 0);
+      Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
+    }
+
+    for (String metric : TestIndex.METRICS) {
+      final ColumnAnalysis columnAnalysis = columns.get(metric.toLowerCase());
+
+      Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
+      Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
+      Assert.assertNull(metric, columnAnalysis.getCardinality());
+    }
+  }
+
+  /**
+   * *Awesome* method name auto-generated by IntelliJ!  I love IntelliJ!
+   *
+   * @param index
+   * @return
+   */
+  private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
+  {
+    final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
+        (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index
+    );
+
+    final SegmentMetadataQuery query = new SegmentMetadataQuery(
+        "test", QuerySegmentSpecs.create("2011/2012"), null, null, null
+    );
+    return Sequences.toList(query.run(runner), Lists.<SegmentAnalysis>newArrayList());
+  }
+}