SOLR-2066: Added support for distributed grouping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/branch_3x@1171968 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/SearchGroup.java b/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/SearchGroup.java
index cf884fb..ce8ffd2 100644
--- a/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/SearchGroup.java
+++ b/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/SearchGroup.java
@@ -17,13 +17,13 @@
  * limitations under the License.
  */
 
-import java.io.IOException;
-import java.util.*;
-
 import org.apache.lucene.search.FieldComparator;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 
+import java.io.IOException;
+import java.util.*;
+
 /**
  * Represents a group that is found during the first pass search.
  *
@@ -46,6 +46,29 @@
     return("SearchGroup(groupValue=" + groupValue + " sortValues=" + Arrays.toString(sortValues) + ")");
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SearchGroup that = (SearchGroup) o;
+
+    if (groupValue == null) {
+      if (that.groupValue != null) {
+        return false;
+      }
+    } else if (!groupValue.equals(that.groupValue)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return groupValue != null ? groupValue.hashCode() : 0;
+  }
+
   private static class ShardIter<T> {
     public final Iterator<SearchGroup<T>> iter;
     public final int shardIndex;
diff --git a/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java b/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java
index c5c376e..4d23c8d 100644
--- a/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java
+++ b/lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java
@@ -17,13 +17,13 @@
  * limitations under the License.
  */
 
-import java.io.IOException;
-
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.TopDocs;
 
+import java.io.IOException;
+
 /** Represents result returned by a grouping search.
  *
  * @lucene.experimental */
@@ -70,8 +70,13 @@
    *  same groupSort and docSort, and the top groups passed
    *  to all second-pass collectors must be the same.
    *
-   * <b>NOTE</b>: this cannot merge totalGroupCount; ie the
-   * returned TopGroups will have null totalGroupCount.
+   * <b>NOTE</b>: We can't always compute an exact totalGroupCount.
+   * Documents belonging to a group may occur on more than
+   * one shard and thus the merged totalGroupCount can be
+   * higher than the actual totalGroupCount. In this case the
+   * totalGroupCount represents a upper bound. If the documents
+   * of one group do only reside in one shard then the
+   * totalGroupCount is exact.
    *
    * <b>NOTE</b>: the topDocs in each GroupDocs is actually
    * an instance of TopDocsAndShards
@@ -87,6 +92,8 @@
 
     int totalHitCount = 0;
     int totalGroupedHitCount = 0;
+    // Optionally merge the totalGroupCount.
+    Integer totalGroupCount = null;
 
     final int numGroups = shardGroups[0].groups.length;
     for(TopGroups<T> shard : shardGroups) {
@@ -95,6 +102,13 @@
       }
       totalHitCount += shard.totalHitCount;
       totalGroupedHitCount += shard.totalGroupedHitCount;
+      if (shard.totalGroupCount != null) {
+        if (totalGroupCount == null) {
+          totalGroupCount = 0;
+        }
+
+        totalGroupCount += shard.totalGroupCount;
+      }
     }
 
     @SuppressWarnings("unchecked")
@@ -156,10 +170,19 @@
                                                    shardGroups[0].groups[groupIDX].groupSortValues);
     }
 
-    return new TopGroups<T>(groupSort.getSort(),
-                            docSort == null ? null : docSort.getSort(),
-                            totalHitCount,
-                            totalGroupedHitCount,
-                            mergedGroupDocs);
+    if (totalGroupCount != null) {
+      TopGroups<T> result = new TopGroups<T>(groupSort.getSort(),
+                              docSort == null ? null : docSort.getSort(),
+                              totalHitCount,
+                              totalGroupedHitCount,
+                              mergedGroupDocs);
+      return new TopGroups<T>(result, totalGroupCount);
+    } else {
+      return new TopGroups<T>(groupSort.getSort(),
+                              docSort == null ? null : docSort.getSort(),
+                              totalHitCount,
+                              totalGroupedHitCount,
+                              mergedGroupDocs);
+    }
   }
 }
diff --git a/lucene/src/java/org/apache/lucene/search/FieldComparator.java b/lucene/src/java/org/apache/lucene/search/FieldComparator.java
index eb7cffa..dfe3530 100644
--- a/lucene/src/java/org/apache/lucene/search/FieldComparator.java
+++ b/lucene/src/java/org/apache/lucene/search/FieldComparator.java
@@ -178,7 +178,17 @@
    *  if your values may sometimes be null */
   @SuppressWarnings("unchecked")
   public int compareValues(T first, T second) {
-    return ((Comparable<T>) first).compareTo(second);
+    if (first == null) {
+      if (second == null) {
+        return 0;
+      } else {
+        return -1;
+      }
+    } else if (second == null) {
+      return 1;
+    } else {
+      return ((Comparable<T>) first).compareTo(second);
+    }
   }
 
   /** Parses field's values as byte (using {@link
diff --git a/lucene/src/java/org/apache/lucene/search/TopDocs.java b/lucene/src/java/org/apache/lucene/search/TopDocs.java
index 2a9d08d..c375871 100644
--- a/lucene/src/java/org/apache/lucene/search/TopDocs.java
+++ b/lucene/src/java/org/apache/lucene/search/TopDocs.java
@@ -232,11 +232,8 @@
       assert queue.size() > 0;
       ShardRef ref = queue.pop();
       final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex++];
-      if (sort == null) {
-        hits[hitUpto] = new ScoreDoc(hit.doc, hit.score, ref.shardIndex);
-      } else {
-        hits[hitUpto] = new FieldDoc(hit.doc, hit.score, ((FieldDoc) hit).fields, ref.shardIndex);
-      }
+      hit.shardIndex = ref.shardIndex;
+      hits[hitUpto] = hit;
 
       //System.out.println("  hitUpto=" + hitUpto);
       //System.out.println("    doc=" + hits[hitUpto].doc + " score=" + hits[hitUpto].score);
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 00982b6..af0feb1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -36,6 +36,9 @@
   can be specified with a name in solrconfig.xml, and use hl.boundaryScanner=name
   parameter to specify the named <boundaryScanner/>. (koji)
 
+* SOLR-2066: Added support for distributed grouping.
+  (Martijn van Groningen, Jasper van Veghel, Matt Beaumont)
+
 
 Bug Fixes
 ----------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
index 17b7f2e..55e1043 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
@@ -20,10 +20,11 @@
 import org.apache.lucene.document.Field;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.queryParser.ParseException;
-import org.apache.lucene.search.FieldComparator;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.*;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
@@ -35,6 +36,25 @@
 import org.apache.solr.schema.FieldType;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.*;
+import org.apache.solr.search.grouping.CommandHandler;
+import org.apache.solr.search.grouping.GroupingSpecification;
+import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
+import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.command.QueryCommand;
+import org.apache.solr.search.grouping.distributed.command.SearchGroupsFieldCommand;
+import org.apache.solr.search.grouping.distributed.command.TopGroupsFieldCommand;
+import org.apache.solr.search.grouping.distributed.requestfactory.SearchGroupsRequestFactory;
+import org.apache.solr.search.grouping.distributed.requestfactory.StoredFieldsShardRequestFactory;
+import org.apache.solr.search.grouping.distributed.requestfactory.TopGroupsShardRequestFactory;
+import org.apache.solr.search.grouping.distributed.responseprocessor.SearchGroupShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.responseprocessor.StoredFieldsShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.responseprocessor.TopGroupsShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.shardresultserializer.SearchGroupsResultTransformer;
+import org.apache.solr.search.grouping.distributed.shardresultserializer.TopGroupsResultTransformer;
+import org.apache.solr.search.grouping.endresulttransformer.EndResultTransformer;
+import org.apache.solr.search.grouping.endresulttransformer.GroupedEndResultTransformer;
+import org.apache.solr.search.grouping.endresulttransformer.MainEndResultTransformer;
+import org.apache.solr.search.grouping.endresulttransformer.SimpleEndResultTransformer;
 import org.apache.solr.util.SolrPluginUtils;
 
 import java.io.IOException;
@@ -119,6 +139,46 @@
     if(shards_start != null) {
       rb.shards_start = Integer.parseInt(shards_start);
     }
+
+    boolean grouping = params.getBool(GroupParams.GROUP, false);
+    if (!grouping) {
+      return;
+    }
+    SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
+    SolrIndexSearcher searcher = rb.req.getSearcher();
+    GroupingSpecification groupingSpec = new GroupingSpecification();
+    rb.setGroupingSpec(groupingSpec);
+
+    //TODO: move weighting of sort
+    Sort groupSort = searcher.weightSort(cmd.getSort());
+    // groupSort defaults to sort
+    String groupSortStr = params.get(GroupParams.GROUP_SORT);
+    if (groupSort == null) {
+      groupSort = new Sort();
+    }
+    //TODO: move weighting of sort
+    Sort sortWithinGroup = groupSortStr == null ?  groupSort : searcher.weightSort(QueryParsing.parseSort(groupSortStr, req));
+    groupingSpec.setSortWithinGroup(sortWithinGroup);
+    groupingSpec.setGroupSort(groupSort);
+
+    String formatStr = params.get(GroupParams.GROUP_FORMAT, Grouping.Format.grouped.name());
+    Grouping.Format responseFormat;
+    try {
+       responseFormat = Grouping.Format.valueOf(formatStr);
+    } catch (IllegalArgumentException e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, String.format("Illegal %s parameter", GroupParams.GROUP_FORMAT));
+    }
+    groupingSpec.setResponseFormat(responseFormat);
+
+    groupingSpec.setFields(params.getParams(GroupParams.GROUP_FIELD));
+    groupingSpec.setQueries(params.getParams(GroupParams.GROUP_QUERY));
+    groupingSpec.setGroupOffset(params.getInt(GroupParams.GROUP_OFFSET, 0));
+    groupingSpec.setGroupLimit(params.getInt(GroupParams.GROUP_LIMIT, 1));
+    groupingSpec.setOffset(rb.getSortSpec().getOffset());
+    groupingSpec.setLimit(rb.getSortSpec().getCount());
+    groupingSpec.setIncludeGroupCount(params.getBool(GroupParams.GROUP_TOTAL_COUNT, false));
+    groupingSpec.setMain(params.getBool(GroupParams.GROUP_MAIN, false));
+    groupingSpec.setNeedScore((cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0);
   }
 
   /**
@@ -181,75 +241,131 @@
     //
     // grouping / field collapsing
     //
-    boolean doGroup = params.getBool(GroupParams.GROUP, false);
-    if (doGroup) {
+    GroupingSpecification groupingSpec = rb.getGroupingSpec();
+    if (groupingSpec != null) {
       try {
-        int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
-        boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
-        String[] fields = params.getParams(GroupParams.GROUP_FIELD);
-        String[] queries = params.getParams(GroupParams.GROUP_QUERY);
-        String groupSortStr = params.get(GroupParams.GROUP_SORT);
-        boolean main = params.getBool(GroupParams.GROUP_MAIN, false);
-        boolean truncateGroups = params.getBool(GroupParams.GROUP_TRUNCATE, false);
+        boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
+        if (params.getBool("group.distibuted.first", false)) {
+          CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
+              .setQueryCommand(cmd)
+              .setNeedDocSet(false) // Order matters here
+              .setSearcher(searcher);
 
-        String formatStr = params.get(GroupParams.GROUP_FORMAT, Grouping.Format.grouped.name());
-        Grouping.Format defaultFormat;
-        try {
-          defaultFormat = Grouping.Format.valueOf(formatStr);
-        } catch (IllegalArgumentException e) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, String.format("Illegal %s parameter", GroupParams.GROUP_FORMAT));
+          for (String field : groupingSpec.getFields()) {
+            topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder()
+                .setField(searcher.getSchema().getField(field))
+                .setGroupSort(groupingSpec.getGroupSort())
+                .setTopNGroups(cmd.getOffset() + cmd.getLen())
+                .build()
+            );
+          }
+
+          CommandHandler commandHandler = topsGroupsActionBuilder.build();
+          commandHandler.execute();
+          SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);
+          rsp.add("firstPhase", commandHandler.processResult(result, serializer));
+          rb.setResult(result);
+          return;
+        } else if (params.getBool("group.distibuted.second", false)) {
+          CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
+              .setQueryCommand(cmd)
+              .setSearcher(searcher);
+
+          for (String field : groupingSpec.getFields()) {
+            String[] topGroupsParam = params.getParams("group.topgroups." + field);
+            if (topGroupsParam == null) {
+              continue;
+            }
+
+            List<SearchGroup<String>> topGroups = new ArrayList<SearchGroup<String>>(topGroupsParam.length);
+            for (String topGroup : topGroupsParam) {
+              SearchGroup<String> searchGroup = new SearchGroup<String>();
+              if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {
+                searchGroup.groupValue = searcher.getSchema().getField(field).getType().readableToIndexed(topGroup);
+              }
+              topGroups.add(searchGroup);
+            }
+
+            secondPhaseBuilder.addCommandField(
+                new TopGroupsFieldCommand.Builder()
+                    .setField(searcher.getSchema().getField(field))
+                    .setGroupSort(groupingSpec.getGroupSort())
+                    .setSortWithinGroup(groupingSpec.getSortWithinGroup())
+                    .setFirstPhaseGroups(topGroups)
+                    .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit())
+                    .setNeedScores(needScores)
+                    .setNeedMaxScore(needScores)
+                    .setNeedGroupCount(groupingSpec.isIncludeGroupCount())
+                    .build()
+            );
+          }
+
+          for (String query : groupingSpec.getQueries()) {
+            secondPhaseBuilder.addCommandField(new QueryCommand.Builder()
+                .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit())
+                .setSort(groupingSpec.getGroupSort())
+                .setQuery(query, rb.req)
+                .setDocSet(searcher)
+                .build()
+            );
+          }
+
+          CommandHandler commandHandler = secondPhaseBuilder.build();
+          commandHandler.execute();
+          TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
+          rsp.add("secondPhase", commandHandler.processResult(result, serializer));
+          rb.setResult(result);
+          return;
         }
 
-        boolean includeTotalGroupCount = params.getBool(GroupParams.GROUP_TOTAL_COUNT, false);
-        Grouping.TotalCount defaultTotalCount = includeTotalGroupCount ? Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
-        Sort sort = searcher.weightSort(cmd.getSort());
-        // groupSort defaults to sort
-        Sort groupSort = groupSortStr == null ?  sort : searcher.weightSort(QueryParsing.parseSort(groupSortStr, req));
-
+        int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
+        boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
+        boolean truncateGroups = params.getBool(GroupParams.GROUP_TRUNCATE, false);
+        Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?
+            Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
         int limitDefault = cmd.getLen(); // this is normally from "rows"
-        int groupOffsetDefault = params.getInt(GroupParams.GROUP_OFFSET, 0);
-        int docsPerGroupDefault = params.getInt(GroupParams.GROUP_LIMIT, 1);
-
-        Grouping grouping = new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, main);
-        grouping.setSort(sort)
-            .setGroupSort(groupSort)
-            .setDefaultFormat(defaultFormat)
+        Grouping grouping =
+            new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());
+        grouping.setSort(groupingSpec.getGroupSort())
+            .setGroupSort(groupingSpec.getSortWithinGroup())
+            .setDefaultFormat(groupingSpec.getResponseFormat())
             .setLimitDefault(limitDefault)
             .setDefaultTotalCount(defaultTotalCount)
-            .setDocsPerGroupDefault(docsPerGroupDefault)
-            .setGroupOffsetDefault(groupOffsetDefault)
+            .setDocsPerGroupDefault(groupingSpec.getGroupLimit())
+            .setGroupOffsetDefault(groupingSpec.getGroupOffset())
             .setGetGroupedDocSet(truncateGroups);
 
-        if (fields != null) {
-          for (String field : fields) {
+        if (groupingSpec.getFields() != null) {
+          for (String field : groupingSpec.getFields()) {
             grouping.addFieldCommand(field, rb.req);
           }
         }
 
-        if (queries != null) {
-          for (String groupByStr : queries) {
+        if (groupingSpec.getQueries() != null) {
+          for (String groupByStr : groupingSpec.getQueries()) {
             grouping.addQueryCommand(groupByStr, rb.req);
           }
         }
 
-        if (rb.doHighlights || rb.isDebug()) {
+        if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {
           // we need a single list of the returned docs
           cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);
         }
 
         grouping.execute();
-        rb.setResult( result );
         if (grouping.isSignalCacheWarning()) {
           rsp.add(
               "cacheWarning",
               String.format("Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache)
           );
         }
-        rsp.add("grouped", result.groupedResults);
+        rb.setResult(result);
+
         if (grouping.mainResult != null) {
           rsp.add("response", grouping.mainResult);
           rsp.getToLog().add("hits", grouping.mainResult.matches());
-        } else {
+        } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.
+          rsp.add("grouped", result.groupedResults);
           rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());
         }
         return;
@@ -280,7 +396,7 @@
     if(fsv){
       Sort sort = searcher.weightSort(rb.getSortSpec().getSort());
       SortField[] sortFields = sort==null ? new SortField[]{SortField.FIELD_SCORE} : sort.getSort();
-      NamedList sortVals = new NamedList(); // order is important for the sort fields
+      NamedList<List> sortVals = new NamedList<List>(); // order is important for the sort fields
       Field field = new Field("dummy", "", Field.Store.YES, Field.Index.NO); // a dummy Field
 
       SolrIndexReader reader = searcher.getReader();
@@ -364,6 +480,48 @@
 
   @Override  
   public int distributedProcess(ResponseBuilder rb) throws IOException {
+    if (rb.grouping()) {
+      return groupedDistributedProcess(rb);
+    } else {
+      return regularDistributedProcess(rb);
+    }
+  }
+
+  private int groupedDistributedProcess(ResponseBuilder rb) {
+    int nextStage = ResponseBuilder.STAGE_DONE;
+    ShardRequestFactory shardRequestFactory = null;
+
+    if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY) {
+      nextStage = ResponseBuilder.STAGE_PARSE_QUERY;
+    } else if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
+      createDistributedIdf(rb);
+      nextStage = ResponseBuilder.STAGE_TOP_GROUPS;
+    } else if (rb.stage < ResponseBuilder.STAGE_TOP_GROUPS) {
+      nextStage = ResponseBuilder.STAGE_TOP_GROUPS;
+    } else if (rb.stage == ResponseBuilder.STAGE_TOP_GROUPS) {
+      shardRequestFactory = new SearchGroupsRequestFactory();
+      nextStage = ResponseBuilder.STAGE_EXECUTE_QUERY;
+    } else if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) {
+      nextStage = ResponseBuilder.STAGE_EXECUTE_QUERY;
+    } else if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {
+      shardRequestFactory = new TopGroupsShardRequestFactory();
+      nextStage = ResponseBuilder.STAGE_GET_FIELDS;
+    } else if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) {
+      nextStage = ResponseBuilder.STAGE_GET_FIELDS;
+    } else if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
+      shardRequestFactory = new StoredFieldsShardRequestFactory();
+      nextStage = ResponseBuilder.STAGE_DONE;
+    }
+
+    if (shardRequestFactory != null) {
+      for (ShardRequest shardRequest : shardRequestFactory.constructRequest(rb)) {
+        rb.addRequest(this, shardRequest);
+      }
+    }
+    return nextStage;
+  }
+
+  private int regularDistributedProcess(ResponseBuilder rb) {
     if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)
       return ResponseBuilder.STAGE_PARSE_QUERY;
     if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
@@ -383,33 +541,101 @@
     return ResponseBuilder.STAGE_DONE;
   }
 
-
   @Override
   public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
+    if (rb.grouping()) {
+      handleGroupedResponses(rb, sreq);
+    } else {
+      handleRegularResponses(rb, sreq);
+    }
+  }
+
+  private void handleGroupedResponses(ResponseBuilder rb, ShardRequest sreq) {
+    ShardResponseProcessor responseProcessor = null;
+    if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_GROUPS) != 0) {
+      responseProcessor = new SearchGroupShardResponseProcessor();
+    } else if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
+      responseProcessor = new TopGroupsShardResponseProcessor();
+    } else if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
+      responseProcessor = new StoredFieldsShardResponseProcessor();
+    }
+
+    if (responseProcessor != null) {
+      responseProcessor.process(rb, sreq);
+    }
+  }
+
+  private void handleRegularResponses(ResponseBuilder rb, ShardRequest sreq) {
     if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
       mergeIds(rb, sreq);
     }
 
     if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
       returnFields(rb, sreq);
-      return;
     }
   }
 
   @Override
   public void finishStage(ResponseBuilder rb) {
-    if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
-      // We may not have been able to retrieve all the docs due to an
-      // index change.  Remove any null documents.
-      for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
-        if (iter.next() == null) {
-          iter.remove();
-          rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
-        }        
+    if (rb.stage != ResponseBuilder.STAGE_GET_FIELDS) {
+      return;
+    }
+    if (rb.grouping()) {
+      groupedFinishStage(rb);
+    } else {
+      regularFinishStage(rb);
+    }
+  }
+
+  private static final EndResultTransformer MAIN_END_RESULT_TRANSFORMER = new MainEndResultTransformer();
+  private static final EndResultTransformer SIMPLE_END_RESULT_TRANSFORMER = new SimpleEndResultTransformer();
+
+  @SuppressWarnings("unchecked")
+  private void groupedFinishStage(final ResponseBuilder rb) {
+    // To have same response as non-distributed request.
+    GroupingSpecification groupSpec = rb.getGroupingSpec();
+    if (rb.mergedTopGroups.isEmpty()) {
+      for (String field : groupSpec.getFields()) {
+        rb.mergedTopGroups.put(field, new TopGroups(null, null, 0, 0, new GroupDocs[]{}));
+      }
+      rb.resultIds = new HashMap<Object, ShardDoc>();
+    }
+
+    EndResultTransformer.SolrDocumentSource solrDocumentSource = new EndResultTransformer.SolrDocumentSource() {
+
+      public SolrDocument retrieve(ScoreDoc doc) {
+        ShardDoc solrDoc = (ShardDoc) doc;
+        return rb.retrievedDocuments.get(solrDoc.id);
       }
 
-      rb.rsp.add("response", rb._responseDocs);
+    };
+    EndResultTransformer endResultTransformer;
+    if (groupSpec.isMain()) {
+      endResultTransformer = MAIN_END_RESULT_TRANSFORMER;
+    } else if (Grouping.Format.grouped == groupSpec.getResponseFormat()) {
+      endResultTransformer = new GroupedEndResultTransformer(rb.req.getSearcher());
+    } else if (Grouping.Format.simple == groupSpec.getResponseFormat() && !groupSpec.isMain()) {
+      endResultTransformer = SIMPLE_END_RESULT_TRANSFORMER;
+    } else {
+      return;
     }
+    Map<String, Object> combinedMap = new LinkedHashMap<String, Object>();
+    combinedMap.putAll(rb.mergedTopGroups);
+    combinedMap.putAll(rb.mergedQueryCommandResults);
+    endResultTransformer.transform(combinedMap, rb.rsp, rb.getGroupingSpec(), solrDocumentSource);
+  }
+
+  private void regularFinishStage(ResponseBuilder rb) {
+    // We may not have been able to retrieve all the docs due to an
+    // index change.  Remove any null documents.
+    for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
+      if (iter.next() == null) {
+        iter.remove();
+        rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
+      }
+    }
+
+    rb.rsp.add("response", rb._responseDocs);
   }
 
 
@@ -547,7 +773,7 @@
 
       Map<Object,ShardDoc> resultIds = new HashMap<Object,ShardDoc>();
       for (int i=resultSize-1; i>=0; i--) {
-        ShardDoc shardDoc = (ShardDoc)queue.pop();
+        ShardDoc shardDoc = queue.pop();
         shardDoc.positionInResponse = i;
         // Need the toString() for correlation with other lists that must
         // be strings (like keys in highlighting, explain, etc)
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
index bd9a36e..3a0bda5 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
@@ -18,6 +18,10 @@
 package org.apache.solr.handler.component;
 
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.RTimer;
@@ -28,7 +32,11 @@
 import org.apache.solr.search.QParser;
 import org.apache.solr.search.SortSpec;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.GroupingSpecification;
+import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
 
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -57,6 +65,7 @@
   private Query query = null;
   private List<Query> filters = null;
   private SortSpec sortSpec = null;
+  private GroupingSpecification groupingSpec;
 
   private DocListAndSet results = null;
   private NamedList<Object> debugInfo = null;
@@ -87,6 +96,7 @@
 
   public static int STAGE_START           = 0;
   public static int STAGE_PARSE_QUERY     = 1000;
+  public static int STAGE_TOP_GROUPS = 1500;
   public static int STAGE_EXECUTE_QUERY   = 2000;
   public static int STAGE_GET_FIELDS      = 3000;
   public static int STAGE_DONE            = Integer.MAX_VALUE;
@@ -122,7 +132,7 @@
 
   public GlobalCollectionStat globalCollectionStat;
 
-  Map<Object, ShardDoc> resultIds;
+  public Map<Object, ShardDoc> resultIds;
   // Maps uniqueKeyValue to ShardDoc, which may be used to
   // determine order of the doc or uniqueKey in the final
   // returned sequence.
@@ -135,6 +145,13 @@
   StatsInfo _statsInfo;
   TermsComponent.TermsHelper _termsHelper;
 
+  // Context fields for grouping
+  public final Map<String, Collection<SearchGroup<String>>> mergedSearchGroups = new HashMap<String, Collection<SearchGroup<String>>>();
+  public final Map<String, Map<SearchGroup<String>, String>> searchGroupToShard = new HashMap<String, Map<SearchGroup<String>, String>>();
+  public final Map<String, TopGroups<String>> mergedTopGroups = new HashMap<String, TopGroups<String>>();
+  public final Map<String, QueryCommandResult> mergedQueryCommandResults = new HashMap<String, QueryCommandResult>();
+  public final Map<Object, SolrDocument> retrievedDocuments = new HashMap<Object, SolrDocument>();
+
   /**
    * Utility function to add debugging info.  This will make sure a valid
    * debugInfo exists before adding to it.
@@ -246,6 +263,18 @@
     this.sortSpec = sort;
   }
 
+  public GroupingSpecification getGroupingSpec() {
+    return groupingSpec;
+  }
+
+  public void setGroupingSpec(GroupingSpecification groupingSpec) {
+    this.groupingSpec = groupingSpec;
+  }
+
+  public boolean grouping() {
+    return groupingSpec != null;
+  }
+
   public RTimer getTimer() {
     return timer;
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardDoc.java b/solr/core/src/java/org/apache/solr/handler/component/ShardDoc.java
index 5b17b1b..c2cb3a8 100755
--- a/solr/core/src/java/org/apache/solr/handler/component/ShardDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardDoc.java
@@ -16,19 +16,20 @@
  */
 package org.apache.solr.handler.component;
 
-import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.FieldComparatorSource;
+import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.util.PriorityQueue;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.search.MissingStringLastComparatorSource;
 
 import java.text.Collator;
-import java.util.Comparator;
-import java.util.Locale;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
 
-public class ShardDoc {
+public class ShardDoc extends FieldDoc {
   public String shard;
   public String shardAddress;  // TODO
   
@@ -37,7 +38,7 @@
     // to short-circuit comparisons if the shard is equal, and can
     // also be used to break ties within the same shard.
 
-  Object id;
+  public Object id;
     // this is currently the uniqueKeyField but
     // may be replaced with internal docid in a future release.
 
@@ -53,9 +54,36 @@
   // but we shouldn't expose uniqueKey (have a map by it) until the stored-field
   // retrieval stage.
 
-  int positionInResponse;
+  public int positionInResponse;
   // the ordinal position in the merged response arraylist  
 
+  public ShardDoc(float score, Object[] fields, Object uniqueId, String shard) {
+      super(-1, score, fields);
+      this.id = uniqueId;
+      this.shard = shard;
+  }
+
+  public ShardDoc() {
+    super(-1, Float.NaN);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ShardDoc shardDoc = (ShardDoc) o;
+
+    if (id != null ? !id.equals(shardDoc.id) : shardDoc.id != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return id != null ? id.hashCode() : 0;
+  }
+
   @Override
   public String toString(){
     return "id="+id
@@ -70,7 +98,7 @@
 
 
 // used by distributed search to merge results.
-class ShardFieldSortedHitQueue extends PriorityQueue {
+class ShardFieldSortedHitQueue extends PriorityQueue<ShardDoc> {
 
   /** Stores a comparator corresponding to each field being sorted by */
   protected Comparator[] comparators;
@@ -112,10 +140,7 @@
   }
 
   @Override
-  protected boolean lessThan(Object objA, Object objB) {
-    ShardDoc docA = (ShardDoc)objA;
-    ShardDoc docB = (ShardDoc)objB;
-
+  protected boolean lessThan(ShardDoc docA, ShardDoc docB) {
     // If these docs are from the same shard, then the relative order
     // is how they appeared in the response from that shard.    
     if (docA.shard == docB.shard) {
@@ -286,4 +311,4 @@
     };
   }
 
-}
\ No newline at end of file
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardRequest.java b/solr/core/src/java/org/apache/solr/handler/component/ShardRequest.java
index f1cb7d3..06e87e5 100755
--- a/solr/core/src/java/org/apache/solr/handler/component/ShardRequest.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardRequest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.solr.handler.component;
 
+import org.apache.solr.common.params.ModifiableSolrParams;
+
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.solr.common.params.ModifiableSolrParams;
-
 
 // todo... when finalized make accessors
 public class ShardRequest {
@@ -37,6 +37,7 @@
   public final static int PURPOSE_GET_DEBUG       =0x100;
   public final static int PURPOSE_GET_STATS       =0x200;
   public final static int PURPOSE_GET_TERMS       =0x400;
+  public final static int PURPOSE_GET_TOP_GROUPS  =0x800;
 
   public int purpose;  // the purpose of this request
 
diff --git a/solr/core/src/java/org/apache/solr/search/DocSetCollector.java b/solr/core/src/java/org/apache/solr/search/DocSetCollector.java
new file mode 100644
index 0000000..2893c13
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/DocSetCollector.java
@@ -0,0 +1,95 @@
+package org.apache.solr.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.util.OpenBitSet;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+
+public class DocSetCollector extends Collector {
+  int pos=0;
+  OpenBitSet bits;
+  final int maxDoc;
+  final int smallSetSize;
+  int base;
+
+  // in case there aren't that many hits, we may not want a very sparse
+  // bit array.  Optimistically collect the first few docs in an array
+  // in case there are only a few.
+  final int[] scratch;
+
+  public DocSetCollector(int smallSetSize, int maxDoc) {
+    this.smallSetSize = smallSetSize;
+    this.maxDoc = maxDoc;
+    this.scratch = new int[smallSetSize];
+  }
+
+  @Override
+  public void collect(int doc) throws IOException {
+    doc += base;
+    // optimistically collect the first docs in an array
+    // in case the total number will be small enough to represent
+    // as a small set like SortedIntDocSet instead...
+    // Storing in this array will be quicker to convert
+    // than scanning through a potentially huge bit vector.
+    // FUTURE: when search methods all start returning docs in order, maybe
+    // we could have a ListDocSet() and use the collected array directly.
+    if (pos < scratch.length) {
+      scratch[pos]=doc;
+    } else {
+      // this conditional could be removed if BitSet was preallocated, but that
+      // would take up more memory, and add more GC time...
+      if (bits==null) bits = new OpenBitSet(maxDoc);
+      bits.fastSet(doc);
+    }
+
+    pos++;
+  }
+
+  public DocSet getDocSet() {
+    if (pos<=scratch.length) {
+      // assumes docs were collected in sorted order!
+      return new SortedIntDocSet(scratch, pos);
+    } else {
+      // set the bits for ids that were collected in the array
+      for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
+      return new BitDocSet(bits,pos);
+    }
+  }
+
+  @Override
+  public void setScorer(Scorer scorer) throws IOException {
+  }
+
+  @Override
+  public void setNextReader(IndexReader indexReader, int docBase) throws IOException {
+    this.base = docBase;
+  }
+
+  @Override
+  public boolean acceptsDocsOutOfOrder() {
+    return false;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/DocSetDelegateCollector.java b/solr/core/src/java/org/apache/solr/search/DocSetDelegateCollector.java
new file mode 100644
index 0000000..34df14e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/DocSetDelegateCollector.java
@@ -0,0 +1,84 @@
+package org.apache.solr.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.util.OpenBitSet;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class DocSetDelegateCollector extends DocSetCollector {
+  final Collector collector;
+
+  public DocSetDelegateCollector(int smallSetSize, int maxDoc, Collector collector) {
+    super(smallSetSize, maxDoc);
+    this.collector = collector;
+  }
+
+  @Override
+  public void collect(int doc) throws IOException {
+    collector.collect(doc);
+
+    doc += base;
+    // optimistically collect the first docs in an array
+    // in case the total number will be small enough to represent
+    // as a small set like SortedIntDocSet instead...
+    // Storing in this array will be quicker to convert
+    // than scanning through a potentially huge bit vector.
+    // FUTURE: when search methods all start returning docs in order, maybe
+    // we could have a ListDocSet() and use the collected array directly.
+    if (pos < scratch.length) {
+      scratch[pos]=doc;
+    } else {
+      // this conditional could be removed if BitSet was preallocated, but that
+      // would take up more memory, and add more GC time...
+      if (bits==null) bits = new OpenBitSet(maxDoc);
+      bits.fastSet(doc);
+    }
+
+    pos++;
+  }
+
+  @Override
+  public DocSet getDocSet() {
+    if (pos<=scratch.length) {
+      // assumes docs were collected in sorted order!
+      return new SortedIntDocSet(scratch, pos);
+    } else {
+      // set the bits for ids that were collected in the array
+      for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
+      return new BitDocSet(bits,pos);
+    }
+  }
+
+  @Override
+  public void setScorer(Scorer scorer) throws IOException {
+    collector.setScorer(scorer);
+  }
+
+  @Override
+  public void setNextReader(IndexReader indexReader, int docBase) throws IOException {
+    collector.setNextReader(indexReader, docBase);
+    this.base = docBase;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/DocSetHitCollector.java b/solr/core/src/java/org/apache/solr/search/DocSetHitCollector.java
deleted file mode 100644
index ae5a1d0..0000000
--- a/solr/core/src/java/org/apache/solr/search/DocSetHitCollector.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.search;
-
-import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.util.OpenBitSet;
-import org.apache.lucene.index.IndexReader;
-
-import java.io.IOException;
-
-/**
- * @version $Id$
- */
-
-class DocSetCollector extends Collector {
-  int pos=0;
-  OpenBitSet bits;
-  final int maxDoc;
-  final int smallSetSize;
-  int base;
-
-  // in case there aren't that many hits, we may not want a very sparse
-  // bit array.  Optimistically collect the first few docs in an array
-  // in case there are only a few.
-  final int[] scratch;
-
-  DocSetCollector(int smallSetSize, int maxDoc) {
-    this.smallSetSize = smallSetSize;
-    this.maxDoc = maxDoc;
-    this.scratch = new int[smallSetSize];
-  }
-  @Override
-  public void collect(int doc) throws IOException {
-    doc += base;
-    // optimistically collect the first docs in an array
-    // in case the total number will be small enough to represent
-    // as a small set like SortedIntDocSet instead...
-    // Storing in this array will be quicker to convert
-    // than scanning through a potentially huge bit vector.
-    // FUTURE: when search methods all start returning docs in order, maybe
-    // we could have a ListDocSet() and use the collected array directly.
-    if (pos < scratch.length) {
-      scratch[pos]=doc;
-    } else {
-      // this conditional could be removed if BitSet was preallocated, but that
-      // would take up more memory, and add more GC time...
-      if (bits==null) bits = new OpenBitSet(maxDoc);
-      bits.fastSet(doc);
-    }
-
-    pos++;
-  }
-
-  public DocSet getDocSet() {
-    if (pos<=scratch.length) {
-      // assumes docs were collected in sorted order!     
-      return new SortedIntDocSet(scratch, pos);
-    } else {
-      // set the bits for ids that were collected in the array
-      for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
-      return new BitDocSet(bits,pos);
-    }
-  }
-
-  @Override
-  public void setScorer(Scorer scorer) throws IOException {
-  }
-
-  @Override
-  public void setNextReader(IndexReader reader, int docBase) throws IOException {
-    this.base = docBase;
-  }
-
-  @Override
-  public boolean acceptsDocsOutOfOrder() {
-    return false;
-  }
-}
-
-class DocSetDelegateCollector extends DocSetCollector {
-  final Collector collector;
-
-  DocSetDelegateCollector(int smallSetSize, int maxDoc, Collector collector) {
-    super(smallSetSize, maxDoc);
-    this.collector = collector;
-  }
-
-  @Override
-  public void collect(int doc) throws IOException {
-    collector.collect(doc);
-
-    doc += base;
-    // optimistically collect the first docs in an array
-    // in case the total number will be small enough to represent
-    // as a small set like SortedIntDocSet instead...
-    // Storing in this array will be quicker to convert
-    // than scanning through a potentially huge bit vector.
-    // FUTURE: when search methods all start returning docs in order, maybe
-    // we could have a ListDocSet() and use the collected array directly.
-    if (pos < scratch.length) {
-      scratch[pos]=doc;
-    } else {
-      // this conditional could be removed if BitSet was preallocated, but that
-      // would take up more memory, and add more GC time...
-      if (bits==null) bits = new OpenBitSet(maxDoc);
-      bits.fastSet(doc);
-    }
-
-    pos++;
-  }
-
-  @Override
-  public DocSet getDocSet() {
-    if (pos<=scratch.length) {
-      // assumes docs were collected in sorted order!
-      return new SortedIntDocSet(scratch, pos);
-    } else {
-      // set the bits for ids that were collected in the array
-      for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
-      return new BitDocSet(bits,pos);
-    }
-  }
-
-  @Override
-  public void setScorer(Scorer scorer) throws IOException {
-    collector.setScorer(scorer);
-  }
-
-  @Override
-  public void setNextReader(IndexReader reader, int docBase) throws IOException {
-    collector.setNextReader(reader, docBase);
-    this.base = docBase;
-  }
-}
diff --git a/solr/core/src/java/org/apache/solr/search/Grouping.java b/solr/core/src/java/org/apache/solr/search/Grouping.java
index 1f1bd48..fa39c84 100644
--- a/solr/core/src/java/org/apache/solr/search/Grouping.java
+++ b/solr/core/src/java/org/apache/solr/search/Grouping.java
@@ -19,7 +19,6 @@
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.lucene.document.Fieldable;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.search.*;
 import org.apache.lucene.search.grouping.*;
@@ -35,6 +34,7 @@
 import org.apache.solr.search.function.OrdFieldSource;
 import org.apache.solr.search.function.ReverseOrdFieldSource;
 import org.apache.solr.search.function.ValueSource;
+import org.apache.solr.search.grouping.collector.FilterCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -774,7 +774,7 @@
      * {@inheritDoc}
      */
     protected void finish() throws IOException {
-      TopDocsCollector topDocsCollector = (TopDocsCollector) collector.collector;
+      TopDocsCollector topDocsCollector = (TopDocsCollector) collector.getDelegate();
       TopDocs topDocs = topDocsCollector.topDocs();
       GroupDocs<String> groupDocs = new GroupDocs<String>(topDocs.getMaxScore(), topDocs.totalHits, topDocs.scoreDocs, query.toString(), null);
       if (main) {
@@ -789,43 +789,7 @@
      * {@inheritDoc}
      */
     public int getMatches() {
-      return collector.matches;
-    }
-  }
-
-  /**
-   * A collector that filters incoming doc ids that are not in the filter
-   */
-  static class FilterCollector extends Collector {
-
-    final DocSet filter;
-    final Collector collector;
-    int docBase;
-    int matches;
-
-    public FilterCollector(DocSet filter, Collector collector) throws IOException {
-      this.filter = filter;
-      this.collector = collector;
-    }
-
-    public void setScorer(Scorer scorer) throws IOException {
-      collector.setScorer(scorer);
-    }
-
-    public void collect(int doc) throws IOException {
-      matches++;
-      if (filter.exists(doc + docBase)) {
-        collector.collect(doc);
-      }
-    }
-
-    public void setNextReader(IndexReader reader, int docBase) throws IOException {
-      this.docBase = docBase;
-      collector.setNextReader(reader, docBase);
-    }
-
-    public boolean acceptsDocsOutOfOrder() {
-      return collector.acceptsDocsOutOfOrder();
+      return collector.getMatches();
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/search/MissingStringLastComparatorSource.java b/solr/core/src/java/org/apache/solr/search/MissingStringLastComparatorSource.java
index 036930d..8ce6a94 100644
--- a/solr/core/src/java/org/apache/solr/search/MissingStringLastComparatorSource.java
+++ b/solr/core/src/java/org/apache/solr/search/MissingStringLastComparatorSource.java
@@ -104,6 +104,21 @@
     }
 
     @Override
+    public int compareValues(String first, String second) {
+      if (first == null) {
+        if (second == null) {
+          return 0;
+        } else {
+          return 1;
+        }
+      } else if (second == null) {
+        return -1;
+      } else {
+        return first.compareTo(second);
+      }
+    }
+
+    @Override
     public int compareBottom(int doc) {
       assert bottomSlot != -1;
       int order = this.order[doc];
diff --git a/solr/core/src/java/org/apache/solr/search/QueryUtils.java b/solr/core/src/java/org/apache/solr/search/QueryUtils.java
index df7b665..3e7fd46 100755
--- a/solr/core/src/java/org/apache/solr/search/QueryUtils.java
+++ b/solr/core/src/java/org/apache/solr/search/QueryUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.solr.search;
 
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
 
 import java.util.List;
 
@@ -30,7 +30,7 @@
 public class QueryUtils {
 
   /** return true if this query has no positive components */
-  static boolean isNegative(Query q) {
+  public static boolean isNegative(Query q) {
     if (!(q instanceof BooleanQuery)) return false;
     BooleanQuery bq = (BooleanQuery)q;
     List<BooleanClause> clauses = bq.clauses();
@@ -51,7 +51,7 @@
    * @param q
    * @return
    */
-  static Query getAbs(Query q) {
+  public static Query getAbs(Query q) {
     if (q instanceof WrappedQuery) {
       Query subQ = ((WrappedQuery)q).getWrappedQuery();
       Query absSubQ = getAbs(subQ);
@@ -95,7 +95,7 @@
   /** Makes negative queries suitable for querying by
    * lucene.
    */
-  static Query makeQueryable(Query q) {
+  public static Query makeQueryable(Query q) {
     if (q instanceof WrappedQuery) {
       return makeQueryable(((WrappedQuery)q).getWrappedQuery());
     }
@@ -105,7 +105,7 @@
   /** Fixes a negative query by adding a MatchAllDocs query clause.
    * The query passed in *must* be a negative query.
    */
-  static Query fixNegativeQuery(Query q) {
+  public static Query fixNegativeQuery(Query q) {
     BooleanQuery newBq = (BooleanQuery)q.clone();
     newBq.add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST);
     return newBq;    
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index dc9e05a..c95be6f 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -640,10 +640,10 @@
   private static Query matchAllDocsQuery = new MatchAllDocsQuery();
 
 
-  static class ProcessedFilter {
-    DocSet answer;  // the answer, if non-null
-    Filter filter;
-    DelegatingCollector postFilter;
+  public static class ProcessedFilter {
+    public DocSet answer;  // the answer, if non-null
+    public Filter filter;
+    public DelegatingCollector postFilter;
   }
 
 
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/Command.java b/solr/core/src/java/org/apache/solr/search/grouping/Command.java
new file mode 100644
index 0000000..bafb2eb
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/Command.java
@@ -0,0 +1,42 @@
+package org.apache.solr.search.grouping;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Sort;
+import org.apache.solr.schema.SchemaField;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ */
+public interface Command<T> {
+
+  List<Collector> create() throws IOException;
+
+  T result();
+
+  String getKey();
+
+  Sort getGroupSort();
+
+  Sort getSortWithinGroup();
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/CommandHandler.java b/solr/core/src/java/org/apache/solr/search/grouping/CommandHandler.java
new file mode 100644
index 0000000..98e53e5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/CommandHandler.java
@@ -0,0 +1,142 @@
+package org.apache.solr.search.grouping;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.MultiCollector;
+import org.apache.lucene.search.Query;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.search.*;
+import org.apache.solr.search.grouping.distributed.shardresultserializer.ShardResultTransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class CommandHandler {
+
+  public static class Builder {
+
+    private SolrIndexSearcher.QueryCommand queryCommand;
+    private List<Command> commands = new ArrayList<Command>();
+    private SolrIndexSearcher searcher;
+    private boolean needDocSet = false;
+
+    public Builder setQueryCommand(SolrIndexSearcher.QueryCommand queryCommand) {
+      this.queryCommand = queryCommand;
+      this.needDocSet = (queryCommand.getFlags() & SolrIndexSearcher.GET_DOCSET) != 0;
+      return this;
+    }
+
+    public Builder addCommandField(Command commandField) {
+      commands.add(commandField);
+      return this;
+    }
+
+    public Builder setSearcher(SolrIndexSearcher searcher) {
+      this.searcher = searcher;
+      return this;
+    }
+
+    /**
+     * Sets whether to compute a {@link DocSet}.
+     * May override the value set by {@link #setQueryCommand(org.apache.solr.search.SolrIndexSearcher.QueryCommand)}.
+     *
+     * @param needDocSet Whether to compute a {@link DocSet}
+     * @return this
+     */
+    public Builder setNeedDocSet(boolean needDocSet) {
+      this.needDocSet = needDocSet;
+      return this;
+    }
+
+    public CommandHandler build() {
+      if (queryCommand == null || searcher == null) {
+        throw new IllegalStateException("All fields must be set");
+      }
+
+      return new CommandHandler(queryCommand, commands, searcher, needDocSet);
+    }
+
+  }
+
+  private final SolrIndexSearcher.QueryCommand queryCommand;
+  private final List<Command> commands;
+  private final SolrIndexSearcher searcher;
+  private final boolean needDocset;
+
+  private DocSet docSet;
+
+  private CommandHandler(SolrIndexSearcher.QueryCommand queryCommand,
+                         List<Command> commands,
+                         SolrIndexSearcher searcher,
+                         boolean needDocset) {
+    this.queryCommand = queryCommand;
+    this.commands = commands;
+    this.searcher = searcher;
+    this.needDocset = needDocset;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void execute() throws IOException {
+    final int nrOfCommands = commands.size();
+    List<Collector> collectors = new ArrayList<Collector>(nrOfCommands);
+    for (Command command : commands) {
+      collectors.addAll(command.create());
+    }
+
+    SolrIndexSearcher.ProcessedFilter pf = searcher.getProcessedFilter(
+        queryCommand.getFilter(), queryCommand.getFilterList()
+    );
+    Filter luceneFilter = pf.filter;
+    Query query = QueryUtils.makeQueryable(queryCommand.getQuery());
+    Collector wrappedCollectors;
+    if (collectors.isEmpty()) {
+      wrappedCollectors = null;
+    } else {
+      wrappedCollectors = MultiCollector.wrap(collectors.toArray(new Collector[nrOfCommands]));
+    }
+
+    if (wrappedCollectors == null && needDocset) {
+      int maxDoc = searcher.maxDoc();
+      DocSetCollector docSetCollector = new DocSetCollector(maxDoc >> 6, maxDoc);
+      searcher.search(query, luceneFilter, docSetCollector);
+      docSet = docSetCollector.getDocSet();
+    } else if (needDocset) {
+      int maxDoc = searcher.maxDoc();
+      DocSetCollector docSetCollector = new DocSetDelegateCollector(maxDoc >> 6, maxDoc, wrappedCollectors);
+      searcher.search(query, luceneFilter, docSetCollector);
+      docSet = docSetCollector.getDocSet();
+    } else {
+      searcher.search(query, luceneFilter, wrappedCollectors);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public NamedList processResult(SolrIndexSearcher.QueryResult queryResult, ShardResultTransformer transformer) throws IOException {
+    if (needDocset) {
+      queryResult.setDocSet(docSet);
+    }
+    return transformer.transform(commands);
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/GroupingSpecification.java b/solr/core/src/java/org/apache/solr/search/grouping/GroupingSpecification.java
new file mode 100644
index 0000000..88ae885
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/GroupingSpecification.java
@@ -0,0 +1,145 @@
+package org.apache.solr.search.grouping;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Sort;
+import org.apache.solr.search.Grouping;
+
+/**
+ * Encapsulates the grouping options like fields group sort and more specified by clients.
+ */
+public class GroupingSpecification {
+
+  private String[] fields = new String[]{};
+  private String[] queries = new String[]{};
+  private int offset;
+  private int limit;
+  private int groupOffset;
+  private int groupLimit;
+  private Sort groupSort;
+  private Sort sortWithinGroup;
+  private boolean includeGroupCount;
+  private boolean main;
+  private Grouping.Format responseFormat;
+  private boolean needScore;
+
+  public String[] getFields() {
+    return fields;
+  }
+
+  public void setFields(String[] fields) {
+    if (fields == null) {
+      return;
+    }
+
+    this.fields = fields;
+  }
+
+  public String[] getQueries() {
+    return queries;
+  }
+
+  public void setQueries(String[] queries) {
+    if (queries == null) {
+      return;
+    }
+
+    this.queries = queries;
+  }
+
+  public int getGroupOffset() {
+    return groupOffset;
+  }
+
+  public void setGroupOffset(int groupOffset) {
+    this.groupOffset = groupOffset;
+  }
+
+  public int getGroupLimit() {
+    return groupLimit;
+  }
+
+  public void setGroupLimit(int groupLimit) {
+    this.groupLimit = groupLimit;
+  }
+
+  public int getOffset() {
+    return offset;
+  }
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public Sort getGroupSort() {
+    return groupSort;
+  }
+
+  public void setGroupSort(Sort groupSort) {
+    this.groupSort = groupSort;
+  }
+
+  public Sort getSortWithinGroup() {
+    return sortWithinGroup;
+  }
+
+  public void setSortWithinGroup(Sort sortWithinGroup) {
+    this.sortWithinGroup = sortWithinGroup;
+  }
+
+  public boolean isIncludeGroupCount() {
+    return includeGroupCount;
+  }
+
+  public void setIncludeGroupCount(boolean includeGroupCount) {
+    this.includeGroupCount = includeGroupCount;
+  }
+
+  public boolean isMain() {
+    return main;
+  }
+
+  public void setMain(boolean main) {
+    this.main = main;
+  }
+
+  public Grouping.Format getResponseFormat() {
+    return responseFormat;
+  }
+
+  public void setResponseFormat(Grouping.Format responseFormat) {
+    this.responseFormat = responseFormat;
+  }
+
+  public boolean isNeedScore() {
+    return needScore;
+  }
+
+  public void setNeedScore(boolean needScore) {
+    this.needScore = needScore;
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/collector/FilterCollector.java b/solr/core/src/java/org/apache/solr/search/grouping/collector/FilterCollector.java
new file mode 100644
index 0000000..f362c0d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/collector/FilterCollector.java
@@ -0,0 +1,74 @@
+package org.apache.solr.search.grouping.collector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Scorer;
+import org.apache.solr.search.DocSet;
+
+import java.io.IOException;
+
+/**
+ * A collector that filters incoming doc ids that are not in the filter
+ */
+public class FilterCollector extends Collector {
+
+  private final DocSet filter;
+  private final Collector delegate;
+  private int docBase;
+  private int matches;
+
+  public FilterCollector(DocSet filter, Collector delegate) throws IOException {
+    this.filter = filter;
+    this.delegate = delegate;
+  }
+
+  public void setScorer(Scorer scorer) throws IOException {
+    delegate.setScorer(scorer);
+  }
+
+  public void collect(int doc) throws IOException {
+    matches++;
+    if (filter.exists(doc + docBase)) {
+      delegate.collect(doc);
+    }
+  }
+
+  public void setNextReader(IndexReader indexReader, int docBase) throws IOException {
+    this.docBase = docBase;
+    delegate.setNextReader(indexReader, docBase);
+  }
+
+  public boolean acceptsDocsOutOfOrder() {
+    return delegate.acceptsDocsOutOfOrder();
+  }
+
+  public int getMatches() {
+    return matches;
+  }
+
+  /**
+   * Returns the delegate collector
+   *
+   * @return the delegate collector
+   */
+  public Collector getDelegate() {
+    return delegate;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardRequestFactory.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardRequestFactory.java
new file mode 100644
index 0000000..92726f5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardRequestFactory.java
@@ -0,0 +1,37 @@
+package org.apache.solr.search.grouping.distributed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardRequest;
+
+/**
+ * Responsible for creating shard requests to the shards in the cluster to perform distributed grouping.
+ */
+public interface ShardRequestFactory {
+
+  /**
+   * Returns {@link ShardRequest} instances.
+   * Never returns <code>null</code>. If no {@link ShardRequest} instances are constructed an empty array is returned.
+   *
+   * @param rb The response builder
+   * @return {@link ShardRequest} instances
+   */
+  ShardRequest[] constructRequest(ResponseBuilder rb);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardResponseProcessor.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardResponseProcessor.java
new file mode 100644
index 0000000..fe6fda3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/ShardResponseProcessor.java
@@ -0,0 +1,37 @@
+package org.apache.solr.search.grouping.distributed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardRequest;
+
+/**
+ * Responsible for processing shard responses.
+ */
+public interface ShardResponseProcessor {
+
+  /**
+   * Processes the responses from the specified shardRequest. The result is put into specific
+   * fields in the specified rb.
+   *
+   * @param rb The ResponseBuilder to put the merge result into
+   * @param shardRequest The shard request containing the responses from all shards.
+   */
+  void process(ResponseBuilder rb, ShardRequest shardRequest);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommand.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommand.java
new file mode 100644
index 0000000..92613a9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommand.java
@@ -0,0 +1,152 @@
+package org.apache.solr.search.grouping.distributed.command;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.search.*;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.search.DocSet;
+import org.apache.solr.search.QParser;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.Command;
+import org.apache.solr.search.grouping.collector.FilterCollector;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ */
+public class QueryCommand implements Command<QueryCommandResult> {
+
+  public static class Builder {
+
+    private Sort sort;
+    private String queryString;
+    private Query query;
+    private DocSet docSet;
+    private Integer docsToCollect;
+    private boolean needScores;
+
+    public Builder setSort(Sort sort) {
+      this.sort = sort;
+      return this;
+    }
+
+    public Builder setQuery(Query query) {
+      this.query = query;
+      return this;
+    }
+
+    /**
+     * Sets the group query from the specified groupQueryString.
+     * The groupQueryString is parsed into a query.
+     *
+     * @param groupQueryString The group query string to parse
+     * @param request The current request
+     * @return this
+     * @throws ParseException If parsing the groupQueryString failed
+     */
+    public Builder setQuery(String groupQueryString, SolrQueryRequest request) throws ParseException {
+      QParser parser = QParser.getParser(groupQueryString, null, request);
+      this.queryString = groupQueryString;
+      return setQuery(parser.getQuery());
+    }
+
+    public Builder setDocSet(DocSet docSet) {
+      this.docSet = docSet;
+      return this;
+    }
+
+    /**
+     * Sets the docSet based on the created {@link DocSet}
+     *
+     * @param searcher The searcher executing the
+     * @return
+     * @throws IOException
+     */
+    public Builder setDocSet(SolrIndexSearcher searcher) throws IOException {
+      return setDocSet(searcher.getDocSet(query));
+    }
+
+    public Builder setDocsToCollect(int docsToCollect) {
+      this.docsToCollect = docsToCollect;
+      return this;
+    }
+
+    public Builder setNeedScores(boolean needScores) {
+      this.needScores = needScores;
+      return this;
+    }
+
+    public QueryCommand build() {
+      if (sort == null || query == null || docSet == null || docsToCollect == null) {
+        throw new IllegalStateException("All fields must be set");
+      }
+
+      return new QueryCommand(sort, query, docsToCollect, needScores, docSet, queryString);
+    }
+
+  }
+
+  private final Sort sort;
+  private final Query query;
+  private final DocSet docSet;
+  private final int docsToCollect;
+  private final boolean needScores;
+  private final String queryString;
+
+  private TopDocsCollector collector;
+  private FilterCollector filterCollector;
+
+  private QueryCommand(Sort sort, Query query, int docsToCollect, boolean needScores, DocSet docSet, String queryString) {
+    this.sort = sort;
+    this.query = query;
+    this.docsToCollect = docsToCollect;
+    this.needScores = needScores;
+    this.docSet = docSet;
+    this.queryString = queryString;
+  }
+
+  public List<Collector> create() throws IOException {
+    if (sort == null || sort == Sort.RELEVANCE) {
+      collector = TopScoreDocCollector.create(docsToCollect, true);
+    } else {
+      collector = TopFieldCollector.create(sort, docsToCollect, true, needScores, needScores, true);
+    }
+    filterCollector = new FilterCollector(docSet, collector);
+    return Arrays.asList((Collector) filterCollector);
+  }
+
+  public QueryCommandResult result() {
+    return new QueryCommandResult(collector.topDocs(), filterCollector.getMatches());
+  }
+
+  public String getKey() {
+    return queryString != null ? queryString : query.toString();
+  }
+
+  public Sort getGroupSort() {
+    return sort;
+  }
+
+  public Sort getSortWithinGroup() {
+    return null;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommandResult.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommandResult.java
new file mode 100644
index 0000000..766dc3a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/QueryCommandResult.java
@@ -0,0 +1,42 @@
+package org.apache.solr.search.grouping.distributed.command;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.TopDocs;
+
+/**
+ * Encapsulates {@link TopDocs} and the number of matches.
+ */
+public class QueryCommandResult {
+
+  private final TopDocs topDocs;
+  private final int matches;
+
+  public QueryCommandResult(TopDocs topDocs, int matches) {
+    this.topDocs = topDocs;
+    this.matches = matches;
+  }
+
+  public TopDocs getTopDocs() {
+    return topDocs;
+  }
+
+  public int getMatches() {
+    return matches;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/SearchGroupsFieldCommand.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/SearchGroupsFieldCommand.java
new file mode 100644
index 0000000..a598b72
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/SearchGroupsFieldCommand.java
@@ -0,0 +1,100 @@
+package org.apache.solr.search.grouping.distributed.command;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.lucene.search.grouping.TermFirstPassGroupingCollector;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.grouping.Command;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ *
+ */
+public class SearchGroupsFieldCommand implements Command<Collection<SearchGroup<String>>> {
+
+  public static class Builder {
+
+    private SchemaField field;
+    private Sort groupSort;
+    private Integer topNGroups;
+
+    public Builder setField(SchemaField field) {
+      this.field = field;
+      return this;
+    }
+
+    public Builder setGroupSort(Sort groupSort) {
+      this.groupSort = groupSort;
+      return this;
+    }
+
+    public Builder setTopNGroups(int topNGroups) {
+      this.topNGroups = topNGroups;
+      return this;
+    }
+
+    public SearchGroupsFieldCommand build() {
+      if (field == null || groupSort == null || topNGroups == null) {
+        throw new IllegalStateException("All fields must be set");
+      }
+
+      return new SearchGroupsFieldCommand(field, groupSort, topNGroups);
+    }
+
+  }
+
+  private final SchemaField field;
+  private final Sort groupSort;
+  private final int topNGroups;
+
+  private TermFirstPassGroupingCollector collector;
+
+  private SearchGroupsFieldCommand(SchemaField field, Sort groupSort, int topNGroups) {
+    this.field = field;
+    this.groupSort = groupSort;
+    this.topNGroups = topNGroups;
+  }
+
+  public List<Collector> create() throws IOException {
+    collector = new TermFirstPassGroupingCollector(field.getName(), groupSort, topNGroups);
+    return Arrays.asList((Collector) collector);
+  }
+
+  public Collection<SearchGroup<String>> result() {
+    return collector.getTopGroups(0, true);
+  }
+
+  public Sort getSortWithinGroup() {
+    return null;
+  }
+
+  public Sort getGroupSort() {
+    return groupSort;
+  }
+
+  public String getKey() {
+    return field.getName();
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/TopGroupsFieldCommand.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/TopGroupsFieldCommand.java
new file mode 100644
index 0000000..20e0ba8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/command/TopGroupsFieldCommand.java
@@ -0,0 +1,165 @@
+package org.apache.solr.search.grouping.distributed.command;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.lucene.search.grouping.TermAllGroupsCollector;
+import org.apache.lucene.search.grouping.TermSecondPassGroupingCollector;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.grouping.Command;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ *
+ */
+public class TopGroupsFieldCommand implements Command<TopGroups<String>> {
+
+  public static class Builder {
+
+    private SchemaField field;
+    private Sort groupSort;
+    private Sort sortWithinGroup;
+    private Collection<SearchGroup<String>> firstPhaseGroups;
+    private Integer maxDocPerGroup;
+    private boolean needScores = false;
+    private boolean needMaxScore = false;
+    private boolean needGroupCount = false;
+
+    public Builder setField(SchemaField field) {
+      this.field = field;
+      return this;
+    }
+
+    public Builder setGroupSort(Sort groupSort) {
+      this.groupSort = groupSort;
+      return this;
+    }
+
+    public Builder setSortWithinGroup(Sort sortWithinGroup) {
+      this.sortWithinGroup = sortWithinGroup;
+      return this;
+    }
+
+    public Builder setFirstPhaseGroups(Collection<SearchGroup<String>> firstPhaseGroups) {
+      this.firstPhaseGroups = firstPhaseGroups;
+      return this;
+    }
+
+    public Builder setMaxDocPerGroup(int maxDocPerGroup) {
+      this.maxDocPerGroup = maxDocPerGroup;
+      return this;
+    }
+
+    public Builder setNeedScores(Boolean needScores) {
+      this.needScores = needScores;
+      return this;
+    }
+
+    public Builder setNeedMaxScore(Boolean needMaxScore) {
+      this.needMaxScore = needMaxScore;
+      return this;
+    }
+
+    public Builder setNeedGroupCount(Boolean needGroupCount) {
+      this.needGroupCount = needGroupCount;
+      return this;
+    }
+
+    public TopGroupsFieldCommand build() {
+      if (field == null || groupSort == null ||  sortWithinGroup == null || firstPhaseGroups == null ||
+          maxDocPerGroup == null) {
+        throw new IllegalStateException("All required fields must be set");
+      }
+
+      return new TopGroupsFieldCommand(field, groupSort, sortWithinGroup, firstPhaseGroups, maxDocPerGroup, needScores, needMaxScore, needGroupCount);
+    }
+
+  }
+
+  private final SchemaField field;
+  private final Sort groupSort;
+  private final Sort sortWithinGroup;
+  private final Collection<SearchGroup<String>> firstPhaseGroups;
+  private final int maxDocPerGroup;
+  private final boolean needScores;
+  private final boolean needMaxScore;
+  private final boolean needGroupCount;
+
+  private TermSecondPassGroupingCollector secondPassCollector;
+  private TermAllGroupsCollector allGroupsCollector;
+
+  private TopGroupsFieldCommand(SchemaField field,
+                                Sort groupSort,
+                                Sort sortWithinGroup,
+                                Collection<SearchGroup<String>> firstPhaseGroups,
+                                int maxDocPerGroup,
+                                boolean needScores,
+                                boolean needMaxScore,
+                                boolean needGroupCount) {
+    this.field = field;
+    this.groupSort = groupSort;
+    this.sortWithinGroup = sortWithinGroup;
+    this.firstPhaseGroups = firstPhaseGroups;
+    this.maxDocPerGroup = maxDocPerGroup;
+    this.needScores = needScores;
+    this.needMaxScore = needMaxScore;
+    this.needGroupCount = needGroupCount;
+  }
+
+  public List<Collector> create() throws IOException {
+    List<Collector> collectors = new ArrayList<Collector>();
+    secondPassCollector = new TermSecondPassGroupingCollector(
+          field.getName(), firstPhaseGroups, groupSort, sortWithinGroup, maxDocPerGroup, needScores, needMaxScore, true
+    );
+    collectors.add(secondPassCollector);
+    if (!needGroupCount) {
+      return collectors;
+    }
+    allGroupsCollector = new TermAllGroupsCollector(field.getName());
+    collectors.add(allGroupsCollector);
+    return collectors;
+  }
+
+  public TopGroups<String> result() {
+    TopGroups<String> result = secondPassCollector.getTopGroups(0);
+    if (allGroupsCollector != null) {
+      result = new TopGroups<String>(result, allGroupsCollector.getGroupCount());
+    }
+    return result;
+  }
+
+  public String getKey() {
+    return field.getName();
+  }
+
+  public Sort getGroupSort() {
+    return groupSort;
+  }
+
+  public Sort getSortWithinGroup() {
+    return sortWithinGroup;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/SearchGroupsRequestFactory.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/SearchGroupsRequestFactory.java
new file mode 100644
index 0000000..2b83dd0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/SearchGroupsRequestFactory.java
@@ -0,0 +1,84 @@
+package org.apache.solr.search.grouping.distributed.requestfactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.GroupingSpecification;
+import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
+
+/**
+ * Concrete implementation of {@link ShardRequestFactory} that creates {@link ShardRequest} instances for getting the
+ * search groups from all shards.
+ */
+public class SearchGroupsRequestFactory implements ShardRequestFactory {
+
+  /**
+   * {@inheritDoc}
+   */
+  public ShardRequest[] constructRequest(ResponseBuilder rb) {
+    ShardRequest sreq = new ShardRequest();
+    GroupingSpecification groupingSpecification = rb.getGroupingSpec();
+    if (groupingSpecification.getFields().length == 0) {
+      return new ShardRequest[0];
+    }
+
+    sreq.purpose = ShardRequest.PURPOSE_GET_TOP_GROUPS;
+
+    sreq.params = new ModifiableSolrParams(rb.req.getParams());
+    // TODO: base on current params or original params?
+
+    // don't pass through any shards param
+    sreq.params.remove(ShardParams.SHARDS);
+
+    // set the start (offset) to 0 for each shard request so we can properly merge
+    // results from the start.
+    if(rb.shards_start > -1) {
+      // if the client set shards.start set this explicitly
+      sreq.params.set(CommonParams.START,rb.shards_start);
+    } else {
+      sreq.params.set(CommonParams.START, "0");
+    }
+    // TODO: should we even use the SortSpec?  That's obtained from the QParser, and
+    // perhaps we shouldn't attempt to parse the query at this level?
+    // Alternate Idea: instead of specifying all these things at the upper level,
+    // we could just specify that this is a shard request.
+    if(rb.shards_rows > -1) {
+      // if the client set shards.rows set this explicity
+      sreq.params.set(CommonParams.ROWS,rb.shards_rows);
+    } else {
+      sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
+    }
+
+    // in this first phase, request only the unique key field
+    // and any fields needed for merging.
+    sreq.params.set("group.distibuted.first","true");
+
+    if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
+      sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
+    } else {
+      sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
+    }
+    return new ShardRequest[] {sreq};
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/StoredFieldsShardRequestFactory.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/StoredFieldsShardRequestFactory.java
new file mode 100644
index 0000000..bef74f7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/StoredFieldsShardRequestFactory.java
@@ -0,0 +1,98 @@
+package org.apache.solr.search.grouping.distributed.requestfactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.GroupParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardDoc;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
+import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StoredFieldsShardRequestFactory implements ShardRequestFactory {
+
+  public ShardRequest[] constructRequest(ResponseBuilder rb) {
+    HashMap<String, Set<ShardDoc>> shardMap = new HashMap<String,Set<ShardDoc>>();
+    for (TopGroups<String> topGroups : rb.mergedTopGroups.values()) {
+      for (GroupDocs<String> group : topGroups.groups) {
+        mapShardToDocs(shardMap, group.scoreDocs);
+      }
+    }
+
+    for (QueryCommandResult queryCommandResult : rb.mergedQueryCommandResults.values()) {
+      mapShardToDocs(shardMap, queryCommandResult.getTopDocs().scoreDocs);
+    }
+
+    ShardRequest[] shardRequests = new ShardRequest[shardMap.size()];
+    SchemaField uniqueField = rb.req.getSchema().getUniqueKeyField();
+    int i = 0;
+    for (Collection<ShardDoc> shardDocs : shardMap.values()) {
+      ShardRequest sreq = new ShardRequest();
+      sreq.purpose = ShardRequest.PURPOSE_GET_FIELDS;
+      sreq.shards = new String[] {shardDocs.iterator().next().shard};
+      sreq.params = new ModifiableSolrParams();
+      sreq.params.add( rb.req.getParams());
+      sreq.params.remove(GroupParams.GROUP);
+      sreq.params.remove(CommonParams.SORT);
+      sreq.params.remove(ResponseBuilder.FIELD_SORT_VALUES);
+      String fl = sreq.params.get(CommonParams.FL);
+      if (fl != null) {
+         fl = fl.trim();
+        // currently, "score" is synonymous with "*,score" so
+        // don't add "id" if the fl is empty or "score" or it would change the meaning.
+         if (fl.length()!=0 && !"score".equals(fl) && !"*".equals(fl)) {
+           sreq.params.set(CommonParams.FL, fl+','+uniqueField.getName());
+         }
+      }
+
+      List<String> ids = new ArrayList<String>(shardDocs.size());
+      for (ShardDoc shardDoc : shardDocs) {
+        ids.add(shardDoc.id.toString());
+      }
+      sreq.params.add(ShardParams.IDS, StrUtils.join(ids, ','));
+      shardRequests[i++] = sreq;
+    }
+
+    return shardRequests;
+  }
+
+  private void mapShardToDocs(HashMap<String, Set<ShardDoc>> shardMap, ScoreDoc[] scoreDocs) {
+    for (ScoreDoc scoreDoc : scoreDocs) {
+      ShardDoc solrDoc = (ShardDoc) scoreDoc;
+      Set<ShardDoc> shardDocs = shardMap.get(solrDoc.shard);
+      if (shardDocs == null) {
+        shardMap.put(solrDoc.shard, shardDocs = new HashSet<ShardDoc>());
+      }
+      shardDocs.add(solrDoc);
+    }
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java
new file mode 100644
index 0000000..c87c2d8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java
@@ -0,0 +1,181 @@
+package org.apache.solr.search.grouping.distributed.requestfactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.reverse.ReverseStringFilter;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.GroupParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.search.Grouping;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Concrete implementation of {@link ShardRequestFactory} that creates {@link ShardRequest} instances for getting the
+ * top groups from all shards.
+ */
+public class TopGroupsShardRequestFactory implements ShardRequestFactory {
+
+  /**
+   * Represents a string value for
+   */
+  public static final String GROUP_NULL_VALUE = "" + ReverseStringFilter.START_OF_HEADING_MARKER;
+
+  /**
+   * {@inheritDoc}
+   */
+  public ShardRequest[] constructRequest(ResponseBuilder rb) {
+    // If we have a group.query we need to query all shards... Or we move this to the group first phase queries
+    boolean containsGroupByQuery = rb.getGroupingSpec().getQueries().length > 0;
+    // TODO: If groups.truncate=true we only have to query the specific shards even faceting and statistics are enabled
+    if ((rb.getQueryCommand().getFlags() & SolrIndexSearcher.GET_DOCSET) != 0 || containsGroupByQuery) {
+      // In case we need more results such as faceting and statistics we have to query all shards
+      return createRequestForAllShards(rb);
+    } else {
+      // In case we only need top groups we only have to query the shards that contain these groups.
+      return createRequestForSpecificShards(rb);
+    }
+  }
+
+  private ShardRequest[] createRequestForSpecificShards(ResponseBuilder rb) {
+    // Determine all unique shards to query for TopGroups
+    Set<String> shards = new HashSet<String>();
+    for (String command : rb.searchGroupToShard.keySet()) {
+      Map<SearchGroup<String>, String> groupsToShard = rb.searchGroupToShard.get(command);
+      shards.addAll(groupsToShard.values());
+    }
+
+    ShardRequest[] sreqs = new ShardRequest[shards.size()];
+    int i = 0;
+    for (String shard : shards) {
+      ShardRequest sreq = new ShardRequest();
+      sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
+      sreq.actualShards = new String[] {shard};
+      sreq.params = new ModifiableSolrParams(rb.req.getParams());
+
+      // If group.format=simple group.offset doesn't make sense
+      Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
+      if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
+        sreq.params.remove(GroupParams.GROUP_OFFSET);
+      }
+
+      sreq.params.remove(ShardParams.SHARDS);
+
+      // set the start (offset) to 0 for each shard request so we can properly merge
+      // results from the start.
+      if(rb.shards_start > -1) {
+        // if the client set shards.start set this explicitly
+        sreq.params.set(CommonParams.START,rb.shards_start);
+      } else {
+        sreq.params.set(CommonParams.START, "0");
+      }
+      if(rb.shards_rows > -1) {
+        // if the client set shards.rows set this explicity
+        sreq.params.set(CommonParams.ROWS,rb.shards_rows);
+      } else {
+        sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
+      }
+
+      sreq.params.set("group.distibuted.second","true");
+      for (Map.Entry<String, Collection<SearchGroup<String>>> entry : rb.mergedSearchGroups.entrySet()) {
+        for (SearchGroup<String> searchGroup : entry.getValue()) {
+          String groupValue;
+          if (searchGroup.groupValue != null) {
+            String rawGroupValue = searchGroup.groupValue;
+            FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
+            groupValue = fieldType.indexedToReadable(rawGroupValue);
+          } else {
+            groupValue = GROUP_NULL_VALUE;
+          }
+          sreq.params.add("group.topgroups." + entry.getKey(), groupValue);
+        }
+      }
+
+      if ((rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0 || rb.getSortSpec().includesScore()) {
+        sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
+      } else {
+        sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
+      }
+      sreqs[i++] = sreq;
+    }
+
+    return sreqs;
+  }
+
+  private ShardRequest[] createRequestForAllShards(ResponseBuilder rb) {
+    ShardRequest sreq = new ShardRequest();
+    sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
+
+    sreq.params = new ModifiableSolrParams(rb.req.getParams());
+    // If group.format=simple group.offset doesn't make sense
+    Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
+    if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
+      sreq.params.remove(GroupParams.GROUP_OFFSET);
+    }
+    sreq.params.remove(ShardParams.SHARDS);
+
+    // set the start (offset) to 0 for each shard request so we can properly merge
+    // results from the start.
+    if(rb.shards_start > -1) {
+      // if the client set shards.start set this explicitly
+      sreq.params.set(CommonParams.START,rb.shards_start);
+    } else {
+      sreq.params.set(CommonParams.START, "0");
+    }
+    if(rb.shards_rows > -1) {
+      // if the client set shards.rows set this explicity
+      sreq.params.set(CommonParams.ROWS,rb.shards_rows);
+    } else {
+      sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
+    }
+
+    sreq.params.set("group.distibuted.second","true");
+    for (Map.Entry<String, Collection<SearchGroup<String>>> entry : rb.mergedSearchGroups.entrySet()) {
+      for (SearchGroup<String> searchGroup : entry.getValue()) {
+        String groupValue;
+        if (searchGroup.groupValue != null) {
+         String rawGroupValue = searchGroup.groupValue;
+          FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
+          groupValue = fieldType.indexedToReadable(rawGroupValue);
+        } else {
+          groupValue = GROUP_NULL_VALUE;
+        }
+        sreq.params.add("group.topgroups." + entry.getKey(), groupValue);
+      }
+    }
+
+    if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
+      sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
+    } else {
+      sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
+    }
+
+    return new ShardRequest[] {sreq};
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/SearchGroupShardResponseProcessor.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/SearchGroupShardResponseProcessor.java
new file mode 100644
index 0000000..a58e871
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/SearchGroupShardResponseProcessor.java
@@ -0,0 +1,92 @@
+package org.apache.solr.search.grouping.distributed.responseprocessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.search.SortSpec;
+import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.shardresultserializer.SearchGroupsResultTransformer;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Concrete implementation for merging {@link SearchGroup} instances from shard responses.
+ */
+public class SearchGroupShardResponseProcessor implements ShardResponseProcessor {
+
+  /**
+   * {@inheritDoc}
+   */
+  public void process(ResponseBuilder rb, ShardRequest shardRequest) {
+    SortSpec ss = rb.getSortSpec();
+    Sort groupSort = rb.getGroupingSpec().getGroupSort();
+    String[] fields = rb.getGroupingSpec().getFields();
+
+    Map<String, List<Collection<SearchGroup<String>>>> commandSearchGroups = new HashMap<String, List<Collection<SearchGroup<String>>>>();
+    Map<String, Map<SearchGroup<String>, String>> tempSearchGroupToShard = new HashMap<String, Map<SearchGroup<String>, String>>();
+    for (String field : fields) {
+      commandSearchGroups.put(field, new ArrayList<Collection<SearchGroup<String>>>(shardRequest.responses.size()));
+      tempSearchGroupToShard.put(field, new HashMap<SearchGroup<String>, String>());
+      if (!rb.searchGroupToShard.containsKey(field)) {
+        rb.searchGroupToShard.put(field, new HashMap<SearchGroup<String>, String>());
+      }
+    }
+
+    SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(rb.req.getSearcher());
+    try {
+      for (ShardResponse srsp : shardRequest.responses) {
+        @SuppressWarnings("unchecked")
+        NamedList<NamedList> firstPhaseResult = (NamedList<NamedList>) srsp.getSolrResponse().getResponse().get("firstPhase");
+        Map<String, Collection<SearchGroup<String>>> result = serializer.transformToNative(firstPhaseResult, groupSort, null, srsp.getShard());
+        for (String field : commandSearchGroups.keySet()) {
+          Collection<SearchGroup<String>> searchGroups = result.get(field);
+          if (searchGroups == null) {
+            continue;
+          }
+
+          commandSearchGroups.get(field).add(searchGroups);
+          for (SearchGroup<String> searchGroup : searchGroups) {
+            tempSearchGroupToShard.get(field).put(searchGroup, srsp.getShard());
+          }
+        }
+      }
+      for (String groupField : commandSearchGroups.keySet()) {
+        List<Collection<SearchGroup<String>>> topGroups = commandSearchGroups.get(groupField);
+        Collection<SearchGroup<String>> mergedTopGroups = SearchGroup.merge(topGroups, ss.getOffset(), ss.getCount(), groupSort);
+        if (mergedTopGroups == null) {
+          continue;
+        }
+
+        rb.mergedSearchGroups.put(groupField, mergedTopGroups);
+        for (SearchGroup<String> mergedTopGroup : mergedTopGroups) {
+          rb.searchGroupToShard.get(groupField).put(mergedTopGroup, tempSearchGroupToShard.get(groupField).get(mergedTopGroup));
+        }
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/StoredFieldsShardResponseProcessor.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/StoredFieldsShardResponseProcessor.java
new file mode 100644
index 0000000..5112e1e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/StoredFieldsShardResponseProcessor.java
@@ -0,0 +1,56 @@
+package org.apache.solr.search.grouping.distributed.responseprocessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.FieldDoc;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardDoc;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
+
+/**
+ * Concrete implementation for processing the stored field values from shard responses.
+ */
+public class StoredFieldsShardResponseProcessor implements ShardResponseProcessor {
+
+  /**
+   * {@inheritDoc}
+   */
+  public void process(ResponseBuilder rb, ShardRequest shardRequest) {
+    boolean returnScores = (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0;
+    ShardResponse srsp = shardRequest.responses.get(0);
+    SolrDocumentList docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
+    String uniqueIdFieldName = rb.req.getSchema().getUniqueKeyField().getName();
+
+    for (SolrDocument doc : docs) {
+      Object id = doc.getFieldValue(uniqueIdFieldName).toString();
+      ShardDoc shardDoc = rb.resultIds.get(id);
+      FieldDoc fieldDoc = (FieldDoc) shardDoc;
+      if (shardDoc != null) {
+        if (returnScores && !Float.isNaN(fieldDoc.score)) {
+            doc.setField("score", fieldDoc.score);
+        }
+        rb.retrievedDocuments.put(id, doc);
+      }
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/TopGroupsShardResponseProcessor.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/TopGroupsShardResponseProcessor.java
new file mode 100644
index 0000000..32d706e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/responseprocessor/TopGroupsShardResponseProcessor.java
@@ -0,0 +1,140 @@
+package org.apache.solr.search.grouping.distributed.responseprocessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardDoc;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.search.Grouping;
+import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
+import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
+import org.apache.solr.search.grouping.distributed.shardresultserializer.TopGroupsResultTransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Concrete implementation for merging {@link TopGroups} instances from shard responses.
+ */
+public class TopGroupsShardResponseProcessor implements ShardResponseProcessor {
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked")
+  public void process(ResponseBuilder rb, ShardRequest shardRequest) {
+    Sort groupSort = rb.getGroupingSpec().getGroupSort();
+    String[] fields = rb.getGroupingSpec().getFields();
+    String[] queries = rb.getGroupingSpec().getQueries();
+    Sort sortWithinGroup = rb.getGroupingSpec().getSortWithinGroup();
+
+    // If group.format=simple group.offset doesn't make sense
+    int groupOffsetDefault;
+    if (rb.getGroupingSpec().getResponseFormat() == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
+      groupOffsetDefault = 0;
+    } else {
+      groupOffsetDefault = rb.getGroupingSpec().getGroupOffset();
+    }
+    int docsPerGroupDefault = rb.getGroupingSpec().getGroupLimit();
+
+    Map<String, List<TopGroups<String>>> commandTopGroups = new HashMap<String, List<TopGroups<String>>>();
+    for (String field : fields) {
+      commandTopGroups.put(field, new ArrayList<TopGroups<String>>());
+    }
+
+    Map<String, List<QueryCommandResult>> commandTopDocs = new HashMap<String, List<QueryCommandResult>>();
+    for (String query : queries) {
+      commandTopDocs.put(query, new ArrayList<QueryCommandResult>());
+    }
+
+    TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
+    for (ShardResponse srsp : shardRequest.responses) {
+      NamedList<NamedList> secondPhaseResult = (NamedList<NamedList>) srsp.getSolrResponse().getResponse().get("secondPhase");
+      Map<String, ?> result = serializer.transformToNative(secondPhaseResult, groupSort, sortWithinGroup, srsp.getShard());
+      for (String field : commandTopGroups.keySet()) {
+        TopGroups<String> topGroups = (TopGroups<String>) result.get(field);
+        if (topGroups == null) {
+          continue;
+        }
+        commandTopGroups.get(field).add(topGroups);
+      }
+      for (String query : queries) {
+        commandTopDocs.get(query).add((QueryCommandResult) result.get(query));
+      }
+    }
+    try {
+      for (String groupField : commandTopGroups.keySet()) {
+        List<TopGroups<String>> topGroups = commandTopGroups.get(groupField);
+        if (topGroups.isEmpty()) {
+          continue;
+        }
+
+        TopGroups<String>[] topGroupsArr = new TopGroups[topGroups.size()];
+        rb.mergedTopGroups.put(groupField, TopGroups.merge(topGroups.toArray(topGroupsArr), groupSort, sortWithinGroup, groupOffsetDefault, docsPerGroupDefault));
+      }
+
+      for (String query : commandTopDocs.keySet()) {
+        List<QueryCommandResult> queryCommandResults = commandTopDocs.get(query);
+        List<TopDocs> topDocs = new ArrayList<TopDocs>(queryCommandResults.size());
+        int mergedMatches = 0;
+        for (QueryCommandResult queryCommandResult : queryCommandResults) {
+          topDocs.add(queryCommandResult.getTopDocs());
+          mergedMatches += queryCommandResult.getMatches();
+        }
+
+        int topN = rb.getGroupingSpec().getOffset() + rb.getGroupingSpec().getLimit();
+        TopDocs mergedTopDocs = TopDocs.merge(sortWithinGroup, topN, topDocs.toArray(new TopDocs[topDocs.size()]));
+        rb.mergedQueryCommandResults.put(query, new QueryCommandResult(mergedTopDocs, mergedMatches));
+      }
+
+      Map<Object, ShardDoc> resultIds = new HashMap<Object, ShardDoc>();
+      int i = 0;
+      for (TopGroups<String> topGroups : rb.mergedTopGroups.values()) {
+        for (GroupDocs<String> group : topGroups.groups) {
+          for (ScoreDoc scoreDoc : group.scoreDocs) {
+            ShardDoc solrDoc = (ShardDoc) scoreDoc;
+            solrDoc.positionInResponse = i++;
+            resultIds.put(solrDoc.id, solrDoc);
+          }
+        }
+      }
+      for (QueryCommandResult queryCommandResult : rb.mergedQueryCommandResults.values()) {
+        for (ScoreDoc scoreDoc : queryCommandResult.getTopDocs().scoreDocs) {
+          ShardDoc solrDoc = (ShardDoc) scoreDoc;
+          solrDoc.positionInResponse = i++;
+          resultIds.put(solrDoc.id, solrDoc);
+        }
+      }
+
+      rb.resultIds = resultIds;
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/SearchGroupsResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/SearchGroupsResultTransformer.java
new file mode 100644
index 0000000..c7c9a91
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/SearchGroupsResultTransformer.java
@@ -0,0 +1,112 @@
+package org.apache.solr.search.grouping.distributed.shardresultserializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.grouping.SearchGroup;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.Command;
+import org.apache.solr.search.grouping.distributed.command.SearchGroupsFieldCommand;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Implementation for transforming {@link SearchGroup} into a {@link NamedList} structure and visa versa.
+ */
+public class SearchGroupsResultTransformer implements ShardResultTransformer<List<Command>, Map<String, Collection<SearchGroup<String>>>> {
+
+  private final SolrIndexSearcher searcher;
+
+  public SearchGroupsResultTransformer(SolrIndexSearcher searcher) {
+    this.searcher = searcher;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public NamedList transform(List<Command> data) throws IOException {
+    NamedList<NamedList> result = new NamedList<NamedList>();
+    for (Command command : data) {
+      NamedList commandResult;
+      if (SearchGroupsFieldCommand.class.isInstance(command)) {
+        SearchGroupsFieldCommand fieldCommand = (SearchGroupsFieldCommand) command;
+        Collection<SearchGroup<String>> searchGroups = fieldCommand.result();
+        if (searchGroups == null) {
+          continue;
+        }
+
+        commandResult = serializeSearchGroup(searchGroups, fieldCommand.getGroupSort());
+      } else {
+        commandResult = null;
+      }
+
+      result.add(command.getKey(), commandResult);
+    }
+    return result;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Map<String, Collection<SearchGroup<String>>> transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) throws IOException {
+    Map<String, Collection<SearchGroup<String>>> result = new HashMap<String, Collection<SearchGroup<String>>>();
+    for (Map.Entry<String, NamedList> command : shardResponse) {
+      List<SearchGroup<String>> searchGroups = new ArrayList<SearchGroup<String>>();
+      @SuppressWarnings("unchecked")
+      NamedList<List<Comparable>> rawSearchGroups = command.getValue();
+      for (Map.Entry<String, List<Comparable>> rawSearchGroup : rawSearchGroups){
+        SearchGroup<String> searchGroup = new SearchGroup<String>();
+        searchGroup.groupValue = rawSearchGroup.getKey() != null ? rawSearchGroup.getKey() : null;
+        searchGroup.sortValues = rawSearchGroup.getValue().toArray(new Comparable[rawSearchGroup.getValue().size()]);
+        searchGroups.add(searchGroup);
+      }
+
+      result.put(command.getKey(), searchGroups);
+    }
+    return result;
+  }
+
+  private NamedList serializeSearchGroup(Collection<SearchGroup<String>> data, Sort groupSort) {
+    NamedList<Comparable[]> result = new NamedList<Comparable[]>();
+
+    for (SearchGroup<String> searchGroup : data) {
+      Comparable[] convertedSortValues = new Comparable[searchGroup.sortValues.length];
+      for (int i = 0; i < searchGroup.sortValues.length; i++) {
+        Comparable sortValue = (Comparable) searchGroup.sortValues[i];
+        SchemaField field = groupSort.getSort()[i].getField() != null ? searcher.getSchema().getFieldOrNull(groupSort.getSort()[i].getField()) : null;
+        if (field != null) {
+          FieldType fieldType = field.getType();
+          if (sortValue instanceof String) {
+            sortValue = (Comparable) fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
+          }
+        }
+        convertedSortValues[i] = sortValue;
+      }
+      String groupValue = searchGroup.groupValue != null ? searchGroup.groupValue : null;
+      result.add(groupValue, convertedSortValues);
+    }
+
+    return result;
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/ShardResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/ShardResultTransformer.java
new file mode 100644
index 0000000..c0902f7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/ShardResultTransformer.java
@@ -0,0 +1,53 @@
+package org.apache.solr.search.grouping.distributed.shardresultserializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.Sort;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+
+/**
+ * A <code>ShardResultTransformer</code> is responsible for transforming a grouped shard result into group related
+ * structures (such as {@link org.apache.lucene.search.grouping.TopGroups} and {@link org.apache.lucene.search.grouping.SearchGroup})
+ * and visa versa.
+ */
+public interface ShardResultTransformer<T, R> {
+
+  /**
+   * Transforms data to a {@link NamedList} structure for serialization purposes.
+   *
+   * @param data The data to be transformed
+   * @return {@link NamedList} structure
+   * @throws IOException If I/O related errors occur during transforming
+   */
+  NamedList transform(T data) throws IOException;
+
+  /**
+   * Transforms the specified shard response into native structures.
+   *
+   * @param shardResponse The shard response containing data in a {@link NamedList} structure
+   * @param groupSort The group sort
+   * @param sortWithinGroup The sort inside a group
+   * @param shard The shard address where the response originated from
+   * @return native structure of the data
+   * @throws IOException If I/O related errors occur during transforming
+   */
+  R transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) throws IOException;
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/TopGroupsResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/TopGroupsResultTransformer.java
new file mode 100644
index 0000000..d24fca5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/shardresultserializer/TopGroupsResultTransformer.java
@@ -0,0 +1,272 @@
+package org.apache.solr.search.grouping.distributed.shardresultserializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.document.FieldSelectorResult;
+import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.component.ResponseBuilder;
+import org.apache.solr.handler.component.ShardDoc;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.grouping.Command;
+import org.apache.solr.search.grouping.distributed.command.QueryCommand;
+import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
+import org.apache.solr.search.grouping.distributed.command.TopGroupsFieldCommand;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation for transforming {@link TopGroups} and {@link TopDocs} into a {@link NamedList} structure and
+ * visa versa.
+ */
+public class TopGroupsResultTransformer implements ShardResultTransformer<List<Command>, Map<String, ?>> {
+
+  private final ResponseBuilder rb;
+
+  public TopGroupsResultTransformer(ResponseBuilder rb) {
+    this.rb = rb;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public NamedList transform(List<Command> data) throws IOException {
+    NamedList<NamedList> result = new NamedList<NamedList>();
+    for (Command command : data) {
+      NamedList commandResult;
+      if (TopGroupsFieldCommand.class.isInstance(command)) {
+        TopGroupsFieldCommand fieldCommand = (TopGroupsFieldCommand) command;
+        SchemaField groupField = rb.req.getSearcher().getSchema().getField(fieldCommand.getKey());
+        commandResult = serializeTopGroups(fieldCommand.result(), groupField);
+      } else if (QueryCommand.class.isInstance(command)) {
+        QueryCommand queryCommand = (QueryCommand) command;
+        commandResult = serializeTopDocs(queryCommand.result());
+      } else {
+        commandResult = null;
+      }
+
+      result.add(command.getKey(), commandResult);
+    }
+    return result;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Map<String, ?> transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) {
+    Map<String, Object> result = new HashMap<String, Object>();
+
+    for (Map.Entry<String, NamedList> entry : shardResponse) {
+      String key = entry.getKey();
+      NamedList commandResult = entry.getValue();
+      Integer totalGroupedHitCount = (Integer) commandResult.get("totalGroupedHitCount");
+      Integer totalHits = (Integer) commandResult.get("totalHits");
+      if (totalHits != null) {
+        Integer matches = (Integer) commandResult.get("matches");
+        Float maxScore = (Float) commandResult.get("maxScore");
+        if (maxScore == null) {
+          maxScore = Float.NaN;
+        }
+
+        @SuppressWarnings("unchecked")
+        List<NamedList<Object>> documents = (List<NamedList<Object>>) commandResult.get("documents");
+        ScoreDoc[] scoreDocs = new ScoreDoc[documents.size()];
+        int j = 0;
+        for (NamedList<Object> document : documents) {
+          Object uniqueId = document.get("id").toString();
+          Float score = (Float) document.get("score");
+          if (score == null) {
+            score = Float.NaN;
+          }
+          Object[] sortValues = ((List) document.get("sortValues")).toArray();
+          scoreDocs[j++] = new ShardDoc(score, sortValues, uniqueId, shard);
+        }
+        result.put(key, new QueryCommandResult(new TopDocs(totalHits, scoreDocs, maxScore), matches));
+        continue;
+      }
+
+      Integer totalHitCount = (Integer) commandResult.get("totalHitCount");
+      Integer totalGroupCount = (Integer) commandResult.get("totalGroupCount");
+
+      List<GroupDocs<String>> groupDocs = new ArrayList<GroupDocs<String>>();
+      for (int i = totalGroupCount == null ? 2 : 3; i < commandResult.size(); i++) {
+        String groupValue = commandResult.getName(i);
+        @SuppressWarnings("unchecked")
+        NamedList<Object> groupResult = (NamedList<Object>) commandResult.getVal(i);
+        Integer totalGroupHits = (Integer) groupResult.get("totalHits");
+        Float maxScore = (Float) groupResult.get("maxScore");
+        if (maxScore == null) {
+          maxScore = Float.NaN;
+        }
+
+        @SuppressWarnings("unchecked")
+        List<NamedList<Object>> documents = (List<NamedList<Object>>) groupResult.get("documents");
+        ScoreDoc[] scoreDocs = new ScoreDoc[documents.size()];
+        int j = 0;
+        for (NamedList<Object> document : documents) {
+          Object uniqueId = document.get("id").toString();
+          Float score = (Float) document.get("score");
+          if (score == null) {
+            score = Float.NaN;
+          }
+          Object[] sortValues = ((List) document.get("sortValues")).toArray();
+          scoreDocs[j++] = new ShardDoc(score, sortValues, uniqueId, shard);
+        }
+
+        String groupValueRef = groupValue != null ? groupValue : null;
+        groupDocs.add(new GroupDocs<String>(maxScore, totalGroupHits, scoreDocs, groupValueRef, null));
+      }
+
+      @SuppressWarnings("unchecked")
+      GroupDocs<String>[] groupDocsArr = groupDocs.toArray(new GroupDocs[groupDocs.size()]);
+      TopGroups<String> topGroups = new TopGroups<String>(
+        groupSort.getSort(), sortWithinGroup.getSort(), totalHitCount, totalGroupedHitCount, groupDocsArr
+      );
+      if (totalGroupCount != null) {
+        topGroups = new TopGroups<String>(topGroups, totalGroupCount);
+      }
+
+      result.put(key, topGroups);
+    }
+
+    return result;
+  }
+
+  protected NamedList serializeTopGroups(TopGroups<String> data, SchemaField groupField) throws IOException {
+    NamedList<Object> result = new NamedList<Object>();
+    result.add("totalGroupedHitCount", data.totalGroupedHitCount);
+    result.add("totalHitCount", data.totalHitCount);
+    if (data.totalGroupCount != null) {
+      result.add("totalGroupCount", data.totalGroupCount);
+    }
+    SchemaField uniqueField = rb.req.getSearcher().getSchema().getUniqueKeyField();
+    for (GroupDocs<String> searchGroup : data.groups) {
+      NamedList<Object> groupResult = new NamedList<Object>();
+      groupResult.add("totalHits", searchGroup.totalHits);
+      if (!Float.isNaN(searchGroup.maxScore)) {
+        groupResult.add("maxScore", searchGroup.maxScore);
+      }
+
+      List<NamedList<Object>> documents = new ArrayList<NamedList<Object>>();
+      for (int i = 0; i < searchGroup.scoreDocs.length; i++) {
+        NamedList<Object> document = new NamedList<Object>();
+        documents.add(document);
+
+        Document doc = retrieveDocument(uniqueField, searchGroup.scoreDocs[i].doc);
+        document.add("id", uniqueField.getType().toObject(doc.getFieldable(uniqueField.getName())));
+        if (!Float.isNaN(searchGroup.scoreDocs[i].score))  {
+          document.add("score", searchGroup.scoreDocs[i].score);
+        }
+        if (!(searchGroup.scoreDocs[i] instanceof FieldDoc)) {
+          continue;
+        }
+
+        FieldDoc fieldDoc = (FieldDoc) searchGroup.scoreDocs[i];
+        Object[] convertedSortValues  = new Object[fieldDoc.fields.length];
+        for (int j = 0; j < fieldDoc.fields.length; j++) {
+          Object sortValue  = fieldDoc.fields[j];
+          Sort sortWithinGroup = rb.getGroupingSpec().getSortWithinGroup();
+          SchemaField field = sortWithinGroup.getSort()[j].getField() != null ? rb.req.getSearcher().getSchema().getFieldOrNull(sortWithinGroup.getSort()[j].getField()) : null;
+          if (field != null) {
+            FieldType fieldType = field.getType();
+            if (sortValue instanceof String) {
+              sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
+            }
+          }
+          convertedSortValues[j] = sortValue;
+        }
+        document.add("sortValues", convertedSortValues);
+      }
+      groupResult.add("documents", documents);
+      String groupValue = searchGroup.groupValue != null ? groupField.getType().indexedToReadable(searchGroup.groupValue): null;
+      result.add(groupValue, groupResult);
+    }
+
+    return result;
+  }
+
+  protected NamedList serializeTopDocs(QueryCommandResult result) throws IOException {
+    NamedList<Object> queryResult = new NamedList<Object>();
+    queryResult.add("matches", result.getMatches());
+    queryResult.add("totalHits", result.getTopDocs().totalHits);
+    if (rb.getGroupingSpec().isNeedScore()) {
+      queryResult.add("maxScore", result.getTopDocs().getMaxScore());
+    }
+    List<NamedList> documents = new ArrayList<NamedList>();
+    queryResult.add("documents", documents);
+
+    SchemaField uniqueField = rb.req.getSearcher().getSchema().getUniqueKeyField();
+    for (ScoreDoc scoreDoc : result.getTopDocs().scoreDocs) {
+      NamedList<Object> document = new NamedList<Object>();
+      documents.add(document);
+
+      Document doc = retrieveDocument(uniqueField, scoreDoc.doc);
+      document.add("id", uniqueField.getType().toObject(doc.getFieldable(uniqueField.getName())));
+      if (rb.getGroupingSpec().isNeedScore())  {
+        document.add("score", scoreDoc.score);
+      }
+      if (!FieldDoc.class.isInstance(scoreDoc)) {
+        continue;
+      }
+
+      FieldDoc fieldDoc = (FieldDoc) scoreDoc;
+      Object[] convertedSortValues  = new Object[fieldDoc.fields.length];
+      for (int j = 0; j < fieldDoc.fields.length; j++) {
+        Object sortValue  = fieldDoc.fields[j];
+        Sort groupSort = rb.getGroupingSpec().getGroupSort();
+        SchemaField field = groupSort.getSort()[j].getField() != null ? rb.req.getSearcher().getSchema().getFieldOrNull(groupSort.getSort()[j].getField()) : null;
+        if (field != null) {
+          FieldType fieldType = field.getType();
+          if (sortValue instanceof String) {
+            sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
+          }
+        }
+        convertedSortValues[j] = sortValue;
+      }
+      document.add("sortValues", convertedSortValues);
+    }
+
+    return queryResult;
+  }
+
+  private Document retrieveDocument(final SchemaField uniqueField, int doc) throws IOException {
+    FieldSelector fieldSelectorVisitor = new FieldSelector() {
+
+      public FieldSelectorResult accept(String fieldName) {
+        if (uniqueField.getName().equals(fieldName)) {
+          return FieldSelectorResult.LOAD_AND_BREAK;
+        }
+        return FieldSelectorResult.NO_LOAD;
+      }
+    };
+    return rb.req.getSearcher().doc(doc, fieldSelectorVisitor);
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/EndResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/EndResultTransformer.java
new file mode 100644
index 0000000..b58b4b7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/EndResultTransformer.java
@@ -0,0 +1,52 @@
+package org.apache.solr.search.grouping.endresulttransformer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.grouping.GroupingSpecification;
+
+import java.util.Map;
+
+/**
+ * Responsible for transforming the grouped result into the final format for displaying purposes.
+ */
+public interface EndResultTransformer {
+
+  /**
+   * Transforms the specified result into its final form and puts it into the specified response.
+   *
+   * @param result The map containing the grouping result (for grouping by field and query)
+   * @param response The response that will be rendered to the client
+   * @param groupingSpecification The grouping specification
+   * @param solrDocumentSource The source of {@link SolrDocument} instances
+   */
+  void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource);
+
+  /**
+   * Abstracts the source for {@link SolrDocument} instances.
+   * The source of documents is different for a distributed search than local search
+   */
+  public interface SolrDocumentSource {
+
+    SolrDocument retrieve(ScoreDoc doc);
+
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/GroupedEndResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/GroupedEndResultTransformer.java
new file mode 100644
index 0000000..d42e367
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/GroupedEndResultTransformer.java
@@ -0,0 +1,107 @@
+package org.apache.solr.search.grouping.endresulttransformer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.grouping.GroupingSpecification;
+import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class GroupedEndResultTransformer implements EndResultTransformer {
+
+  private final SolrIndexSearcher searcher;
+
+  public GroupedEndResultTransformer(SolrIndexSearcher searcher) {
+    this.searcher = searcher;
+  }
+
+  public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
+    NamedList<Object> commands = new NamedList<Object>();
+    for (Map.Entry<String, ?> entry : result.entrySet()) {
+      Object value = entry.getValue();
+      if (TopGroups.class.isInstance(value)) {
+        @SuppressWarnings("unchecked")
+        TopGroups<String> topGroups = (TopGroups<String>) value;
+        NamedList<Object> command = new SimpleOrderedMap<Object>();
+        command.add("matches", topGroups.totalHitCount);
+        if (topGroups.totalGroupCount != null) {
+          command.add("ngroups", topGroups.totalGroupCount);
+        }
+
+        List<NamedList> groups = new ArrayList<NamedList>();
+        SchemaField groupField = searcher.getSchema().getField(entry.getKey());
+        FieldType groupFieldType = groupField.getType();
+        for (GroupDocs<String> group : topGroups.groups) {
+          SimpleOrderedMap<Object> groupResult = new SimpleOrderedMap<Object>();
+          if (group.groupValue != null) {
+            groupResult.add(
+                "groupValue", groupFieldType.toObject(groupField.createField(group.groupValue, 0.0f))
+            );
+          } else {
+            groupResult.add("groupValue", null);
+          }
+          SolrDocumentList docList = new SolrDocumentList();
+          docList.setNumFound(group.totalHits);
+          if (!Float.isNaN(group.maxScore)) {
+            docList.setMaxScore(group.maxScore);
+          }
+          docList.setStart(groupingSpecification.getGroupOffset());
+          for (ScoreDoc scoreDoc : group.scoreDocs) {
+            docList.add(solrDocumentSource.retrieve(scoreDoc));
+          }
+          groupResult.add("doclist", docList);
+          groups.add(groupResult);
+        }
+        command.add("groups", groups);
+        commands.add(entry.getKey(), command);
+      } else if (QueryCommandResult.class.isInstance(value)) {
+        QueryCommandResult queryCommandResult = (QueryCommandResult) value;
+        NamedList<Object> command = new SimpleOrderedMap<Object>();
+        command.add("matches", queryCommandResult.getMatches());
+        SolrDocumentList docList = new SolrDocumentList();
+        docList.setNumFound(queryCommandResult.getTopDocs().totalHits);
+        if (!Float.isNaN(queryCommandResult.getTopDocs().getMaxScore())) {
+          docList.setMaxScore(queryCommandResult.getTopDocs().getMaxScore());
+        }
+        docList.setStart(groupingSpecification.getGroupOffset());
+        for (ScoreDoc scoreDoc :queryCommandResult.getTopDocs().scoreDocs){
+          docList.add(solrDocumentSource.retrieve(scoreDoc));
+        }
+        command.add("doclist", docList);
+        commands.add(entry.getKey(), command);
+      }
+    }
+    response.add("grouped", commands);
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/MainEndResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/MainEndResultTransformer.java
new file mode 100644
index 0000000..79a3972
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/MainEndResultTransformer.java
@@ -0,0 +1,59 @@
+package org.apache.solr.search.grouping.endresulttransformer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.grouping.GroupingSpecification;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class MainEndResultTransformer implements EndResultTransformer {
+
+  public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
+    Object value = result.get(groupingSpecification.getFields()[0]);
+    if (TopGroups.class.isInstance(value)) {
+      @SuppressWarnings("unchecked")
+      TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) value;
+      SolrDocumentList docList = new SolrDocumentList();
+      docList.setStart(groupingSpecification.getOffset());
+      docList.setNumFound(topGroups.totalHitCount);
+
+      Float maxScore = Float.NEGATIVE_INFINITY;
+      for (GroupDocs<BytesRef> group : topGroups.groups) {
+        for (ScoreDoc scoreDoc : group.scoreDocs) {
+          if (maxScore < scoreDoc.score) {
+            maxScore = scoreDoc.score;
+          }
+          docList.add(solrDocumentSource.retrieve(scoreDoc));
+        }
+      }
+      if (maxScore != Float.NEGATIVE_INFINITY) {
+        docList.setMaxScore(maxScore);
+      }
+      response.add("response", docList);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/SimpleEndResultTransformer.java b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/SimpleEndResultTransformer.java
new file mode 100644
index 0000000..6f91702
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/grouping/endresulttransformer/SimpleEndResultTransformer.java
@@ -0,0 +1,73 @@
+package org.apache.solr.search.grouping.endresulttransformer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.grouping.GroupDocs;
+import org.apache.lucene.search.grouping.TopGroups;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.grouping.GroupingSpecification;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class SimpleEndResultTransformer implements EndResultTransformer {
+
+  @Override
+  public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
+    NamedList<Object> commands = new SimpleOrderedMap<Object>();
+    for (Map.Entry<String, ?> entry : result.entrySet()) {
+      Object value = entry.getValue();
+      if (TopGroups.class.isInstance(value)) {
+        @SuppressWarnings("unchecked")
+        TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) value;
+        NamedList<Object> command = new SimpleOrderedMap<Object>();
+        command.add("matches", topGroups.totalHitCount);
+        if (topGroups.totalGroupCount != null) {
+          command.add("ngroups", topGroups.totalGroupCount);
+        }
+        SolrDocumentList docList = new SolrDocumentList();
+        docList.setStart(groupingSpecification.getOffset());
+        docList.setNumFound(topGroups.totalHitCount);
+
+        Float maxScore = Float.NEGATIVE_INFINITY;
+        for (GroupDocs<BytesRef> group : topGroups.groups) {
+          for (ScoreDoc scoreDoc : group.scoreDocs) {
+            if (maxScore < scoreDoc.score) {
+              maxScore = scoreDoc.score;
+            }
+            docList.add(solrDocumentSource.retrieve(scoreDoc));
+          }
+        }
+        if (maxScore != Float.NEGATIVE_INFINITY) {
+          docList.setMaxScore(maxScore);
+        }
+        command.add("doclist", docList);
+        commands.add(entry.getKey(), command);
+      }
+    }
+
+    response.add("grouped", commands);
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/TestDistributedGrouping.java b/solr/core/src/test/org/apache/solr/TestDistributedGrouping.java
new file mode 100644
index 0000000..4f782af
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/TestDistributedGrouping.java
@@ -0,0 +1,152 @@
+package org.apache.solr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * TODO? perhaps use:
+ *  http://docs.codehaus.org/display/JETTY/ServletTester
+ * rather then open a real connection?
+ *
+ * @since solr 3.5
+ */
+public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
+
+  String t1="a_t";
+  String s1="1_a_s1";
+  String s2="2_a_s1";
+  String tlong = "other_tl1";
+  String tdate_a = "a_n_tdt";
+  String tdate_b = "b_n_tdt";
+  String oddField="oddField_s";
+
+  public void doTest() throws Exception {
+    del("*:*");
+    commit();
+
+    handle.clear();
+    handle.put("QTime", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+    // Test distributed grouping with empty indices
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "hl","true","hl.fl",t1);
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "facet", "true", "facet.field", t1);
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "stats", "true", "stats.field", tlong);
+    query("q", "kings", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "spellcheck", "true", "spellcheck.build", "true", "qt", "spellCheckCompRH");
+
+    indexr(id,1, s1, 100, tlong, 100,t1,"now is the time for all good men",
+           tdate_a, "2010-04-20T11:00:00Z",
+           tdate_b, "2009-08-20T11:00:00Z",
+           "foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
+    indexr(id,2, s1, 50 , tlong, 50,t1,"to come to the aid of their country.",
+           tdate_a, "2010-05-02T11:00:00Z",
+           tdate_b, "2009-11-02T11:00:00Z");
+    indexr(id,3, s1, 2, tlong, 2,t1,"how now brown cow",
+           tdate_a, "2010-05-03T11:00:00Z");
+    indexr(id,4, s1, -100 ,tlong, 101,
+           t1,"the quick fox jumped over the lazy dog", 
+           tdate_a, "2010-05-03T11:00:00Z",
+           tdate_b, "2010-05-03T11:00:00Z");
+    indexr(id,5, s1, 500, tlong, 500 ,
+           t1,"the quick fox jumped way over the lazy dog", 
+           tdate_a, "2010-05-05T11:00:00Z");
+    indexr(id,6, s1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall");
+    indexr(id,7, s1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall");
+    indexr(id,8, s1, 876, tlong, 876,
+           tdate_b, "2010-01-05T11:00:00Z",
+           t1,"all the kings horses and all the kings men");
+    indexr(id,9, s1, 7, tlong, 7,t1,"couldn't put humpty together again");
+    indexr(id,10, s1, 4321, tlong, 4321,t1,"this too shall pass");
+    indexr(id,11, s1, -987, tlong, 987,
+           t1,"An eye for eye only ends up making the whole world blind.");
+    indexr(id,12, s1, 379, tlong, 379,
+           t1,"Great works are performed, not by strength, but by perseverance.");
+    indexr(id,13, s1, 232, tlong, 232,
+           t1,"no eggs on wall, lesson learned", 
+           oddField, "odd man out");
+
+    indexr(id, 14, "SubjectTerms_mfacet", new String[]  {"mathematical models", "mathematical analysis"});
+    indexr(id, 15, "SubjectTerms_mfacet", new String[]  {"test 1", "test 2", "test3"});
+    indexr(id, 16, "SubjectTerms_mfacet", new String[]  {"test 1", "test 2", "test3"});
+    String[] vals = new String[100];
+    for (int i=0; i<100; i++) {
+      vals[i] = "test " + i;
+    }
+    indexr(id, 17, "SubjectTerms_mfacet", vals);
+
+    for (int i=100; i<150; i++) {
+      indexr(id, i);      
+    }
+
+    int[] values = new int[]{9999, 99999, 999999, 9999999};
+    for (int shard = 0; shard < clients.size(); shard++) {
+      int groupValue = values[shard];
+      for (int i = 500; i < 600; i++) {
+        index_specific(shard, s1, groupValue, s2, "a", id, i * (shard + 1));
+      }
+    }
+
+    commit();
+
+	  // test grouping
+    // The second sort = id asc . The sorting behaviour is different in dist mode. See TopDocs#merge
+    // The shard the result came from matters in the order if both document sortvalues are equal
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", "id asc, _docid_ asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", "{!func}add(" + tlong + ",5) asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "facet", "true", "facet.field", t1);
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "stats", "true", "stats.field", tlong);
+    query("q", "kings", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "spellcheck", "true", "spellcheck.build", "true", "qt", "spellCheckCompRH");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "facet", "true", "hl","true","hl.fl",t1);
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "group.sort", "id desc");
+
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.offset", 5, "group.limit", 5, "sort", s1 + " asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "offset", 5, "rows", 5, "group.offset", 5, "group.limit", 5, "sort", s1 + " asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "offset", 5, "rows", 5, "sort", s1 + " asc, id asc", "group.format", "simple");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "offset", 5, "rows", 5, "sort", s1 + " asc, id asc", "group.main", "true");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.offset", 5, "group.limit", 5, "sort", s1 + " asc, id asc", "group.format", "simple", "offset", 5, "rows", 5);
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.offset", 5, "group.limit", 5, "sort", s1 + " asc, id asc", "group.main", "true", "offset", 5, "rows", 5);
+
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", s1 + " asc, id asc");
+    query("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", s1 + " asc, id asc");
+
+    // In order to validate this we need to make sure that during indexing that all documents of one group only occur on the same shard
+    query("q", "*:*", "fq", s2 + ":a", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", s1 + " asc, id asc", "group.ngroups", "true");
+
+    // We cannot validate distributed grouping with scoring as first sort. since there is no global idf. We can check if no errors occur
+    simpleQuery("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10, "sort", "score desc, _docid_ asc, id asc");
+    simpleQuery("q", "*:*", "rows", 100, "fl", "id," + s1, "group", "true", "group.field", s1, "group.limit", 10);
+  }
+
+  private void simpleQuery(Object... queryParams) throws SolrServerException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    for (int i = 0; i < queryParams.length; i += 2) {
+      params.add(queryParams[i].toString(), queryParams[i + 1].toString());
+    }
+    queryServer(params);
+  }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
index 069cf47..dbf6557 100644
--- a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
@@ -260,6 +260,14 @@
     for (SolrServer client : clients) client.commit();
   }
 
+  protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
+    // query a random server
+    int which = r.nextInt(clients.size());
+    SolrServer client = clients.get(which);
+    QueryResponse rsp = client.query(params);
+    return rsp;
+  }
+
   protected void query(Object... q) throws Exception {
     final ModifiableSolrParams params = new ModifiableSolrParams();
     params.add("reqid",Integer.toString(random.nextInt())); // just to help correlate top-level requests w/ sub requests
@@ -274,9 +282,7 @@
 
     // query a random server
     params.set("shards", shards);
-    int which = r.nextInt(clients.size());
-    SolrServer client = clients.get(which);
-    QueryResponse rsp = client.query(params);
+    QueryResponse rsp = queryServer(params);
 
     compareResponses(rsp, controlRsp);