Merged /lucene/dev/trunk:r1446984-1447123

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene4765@1447125 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesConsumer.java b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesConsumer.java
index c543186..d020ad4 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesConsumer.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesConsumer.java
@@ -59,7 +59,7 @@
   
   @Override
   public void addNumericField(FieldInfo field, Iterable<Number> values) throws IOException {
-    int count = 0;
+    long count = 0;
     for (@SuppressWarnings("unused") Number nv : values) {
       ++count;
     }
@@ -68,7 +68,7 @@
     meta.writeByte(DiskDocValuesFormat.NUMERIC);
     meta.writeVInt(PackedInts.VERSION_CURRENT);
     meta.writeLong(data.getFilePointer());
-    meta.writeVInt(count);
+    meta.writeVLong(count);
     meta.writeVInt(BLOCK_SIZE);
 
     final BlockPackedWriter writer = new BlockPackedWriter(data, BLOCK_SIZE);
@@ -86,7 +86,7 @@
     int minLength = Integer.MAX_VALUE;
     int maxLength = Integer.MIN_VALUE;
     final long startFP = data.getFilePointer();
-    int count = 0;
+    long count = 0;
     for(BytesRef v : values) {
       minLength = Math.min(minLength, v.length);
       maxLength = Math.max(maxLength, v.length);
@@ -95,7 +95,7 @@
     }
     meta.writeVInt(minLength);
     meta.writeVInt(maxLength);
-    meta.writeVInt(count);
+    meta.writeVLong(count);
     meta.writeLong(startFP);
     
     // if minLength == maxLength, its a fixed-length byte[], we are done (the addresses are implicit)
@@ -124,6 +124,33 @@
   }
   
   @Override
+  public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+    meta.writeVInt(field.number);
+    meta.writeByte(DiskDocValuesFormat.SORTED_SET);
+    // write the ord -> byte[] as a binary field
+    addBinaryField(field, values);
+    // write the stream of ords as a numeric field
+    // NOTE: we could return an iterator that delta-encodes these within a doc
+    addNumericField(field, ords);
+    
+    // write the doc -> ord count as a absolute index to the stream
+    meta.writeVInt(field.number);
+    meta.writeByte(DiskDocValuesFormat.NUMERIC);
+    meta.writeVInt(PackedInts.VERSION_CURRENT);
+    meta.writeLong(data.getFilePointer());
+    meta.writeVLong(maxDoc);
+    meta.writeVInt(BLOCK_SIZE);
+
+    final MonotonicBlockPackedWriter writer = new MonotonicBlockPackedWriter(data, BLOCK_SIZE);
+    long addr = 0;
+    for (Number v : docToOrdCount) {
+      addr += v.longValue();
+      writer.add(addr);
+    }
+    writer.finish();
+  }
+
+  @Override
   public void close() throws IOException {
     boolean success = false;
     try {
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesFormat.java b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesFormat.java
index 5e0a676..b9d021a 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesFormat.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesFormat.java
@@ -58,4 +58,5 @@
   public static final byte NUMERIC = 0;
   public static final byte BINARY = 1;
   public static final byte SORTED = 2;
+  public static final byte SORTED_SET = 3;
 }
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesProducer.java b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesProducer.java
index da14a5b..de59e78 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesProducer.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/diskdv/DiskDocValuesProducer.java
@@ -31,6 +31,7 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
@@ -41,11 +42,13 @@
   private final Map<Integer,NumericEntry> numerics;
   private final Map<Integer,BinaryEntry> binaries;
   private final Map<Integer,NumericEntry> ords;
+  private final Map<Integer,NumericEntry> ordIndexes;
   private final IndexInput data;
 
   // memory-resident structures
   private final Map<Integer,BlockPackedReader> ordinalInstances = new HashMap<Integer,BlockPackedReader>();
   private final Map<Integer,MonotonicBlockPackedReader> addressInstances = new HashMap<Integer,MonotonicBlockPackedReader>();
+  private final Map<Integer,MonotonicBlockPackedReader> ordIndexInstances = new HashMap<Integer,MonotonicBlockPackedReader>();
   
   DiskDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
     String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
@@ -58,6 +61,7 @@
                                 DiskDocValuesFormat.VERSION_START);
       numerics = new HashMap<Integer,NumericEntry>();
       ords = new HashMap<Integer,NumericEntry>();
+      ordIndexes = new HashMap<Integer,NumericEntry>();
       binaries = new HashMap<Integer,BinaryEntry>();
       readFields(in, state.fieldInfos);
       success = true;
@@ -104,6 +108,36 @@
         }
         NumericEntry n = readNumericEntry(meta);
         ords.put(fieldNumber, n);
+      } else if (type == DiskDocValuesFormat.SORTED_SET) {
+        // sortedset = binary + numeric + ordIndex
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.BINARY) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        BinaryEntry b = readBinaryEntry(meta);
+        binaries.put(fieldNumber, b);
+        
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.NUMERIC) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        NumericEntry n1 = readNumericEntry(meta);
+        ords.put(fieldNumber, n1);
+        
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.NUMERIC) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        NumericEntry n2 = readNumericEntry(meta);
+        ordIndexes.put(fieldNumber, n2);
+      } else {
+        throw new CorruptIndexException("invalid type: " + type + ", resource=" + meta);
       }
       fieldNumber = meta.readVInt();
     }
@@ -113,7 +147,7 @@
     NumericEntry entry = new NumericEntry();
     entry.packedIntsVersion = meta.readVInt();
     entry.offset = meta.readLong();
-    entry.count = meta.readVInt();
+    entry.count = meta.readVLong();
     entry.blockSize = meta.readVInt();
     return entry;
   }
@@ -122,7 +156,7 @@
     BinaryEntry entry = new BinaryEntry();
     entry.minLength = meta.readVInt();
     entry.maxLength = meta.readVInt();
-    entry.count = meta.readVInt();
+    entry.count = meta.readVLong();
     entry.offset = meta.readLong();
     if (entry.minLength != entry.maxLength) {
       entry.addressesOffset = meta.readLong();
@@ -135,14 +169,18 @@
   @Override
   public NumericDocValues getNumeric(FieldInfo field) throws IOException {
     NumericEntry entry = numerics.get(field.number);
+    return getNumeric(entry);
+  }
+  
+  LongNumericDocValues getNumeric(NumericEntry entry) throws IOException {
     final IndexInput data = this.data.clone();
     data.seek(entry.offset);
 
     final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, true);
-    return new NumericDocValues() {
+    return new LongNumericDocValues() {
       @Override
-      public long get(int docID) {
-        return reader.get(docID);
+      public long get(long id) {
+        return reader.get(id);
       }
     };
   }
@@ -160,10 +198,10 @@
   private BinaryDocValues getFixedBinary(FieldInfo field, final BinaryEntry bytes) {
     final IndexInput data = this.data.clone();
 
-    return new BinaryDocValues() {
+    return new LongBinaryDocValues() {
       @Override
-      public void get(int docID, BytesRef result) {
-        long address = bytes.offset + docID * (long)bytes.maxLength;
+      public void get(long id, BytesRef result) {
+        long address = bytes.offset + id * bytes.maxLength;
         try {
           data.seek(address);
           // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource) 
@@ -194,11 +232,11 @@
       addresses = addrInstance;
     }
 
-    return new BinaryDocValues() {
+    return new LongBinaryDocValues() {
       @Override
-      public void get(int docID, BytesRef result) {
-        long startAddress = bytes.offset + (docID == 0 ? 0 : + addresses.get(docID-1));
-        long endAddress = bytes.offset + addresses.get(docID);
+      public void get(long id, BytesRef result) {
+        long startAddress = bytes.offset + (id == 0 ? 0 : addresses.get(id-1));
+        long endAddress = bytes.offset + addresses.get(id);
         int length = (int) (endAddress - startAddress);
         try {
           data.seek(startAddress);
@@ -218,7 +256,7 @@
 
   @Override
   public SortedDocValues getSorted(FieldInfo field) throws IOException {
-    final int valueCount = binaries.get(field.number).count;
+    final int valueCount = (int) binaries.get(field.number).count;
     final BinaryDocValues binary = getBinary(field);
     final BlockPackedReader ordinals;
     synchronized (ordinalInstances) {
@@ -252,6 +290,59 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+    final long valueCount = binaries.get(field.number).count;
+    // we keep the byte[]s and list of ords on disk, these could be large
+    final LongBinaryDocValues binary = (LongBinaryDocValues) getBinary(field);
+    final LongNumericDocValues ordinals = getNumeric(ords.get(field.number));
+    // but the addresses to the ord stream are in RAM
+    final MonotonicBlockPackedReader ordIndex;
+    synchronized (ordIndexInstances) {
+      MonotonicBlockPackedReader ordIndexInstance = ordIndexInstances.get(field.number);
+      if (ordIndexInstance == null) {
+        NumericEntry entry = ordIndexes.get(field.number);
+        IndexInput data = this.data.clone();
+        data.seek(entry.offset);
+        ordIndexInstance = new MonotonicBlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, false);
+        ordIndexInstances.put(field.number, ordIndexInstance);
+      }
+      ordIndex = ordIndexInstance;
+    }
+    
+    return new SortedSetDocValues() {
+      long offset;
+      long endOffset;
+      
+      @Override
+      public long nextOrd() {
+        if (offset == endOffset) {
+          return NO_MORE_ORDS;
+        } else {
+          long ord = ordinals.get(offset);
+          offset++;
+          return ord;
+        }
+      }
+
+      @Override
+      public void setDocument(int docID) {
+        offset = (docID == 0 ? 0 : ordIndex.get(docID-1));
+        endOffset = ordIndex.get(docID);
+      }
+
+      @Override
+      public void lookupOrd(long ord, BytesRef result) {
+        binary.get(ord, result);
+      }
+
+      @Override
+      public long getValueCount() {
+        return valueCount;
+      }
+    };
+  }
+
+  @Override
   public void close() throws IOException {
     data.close();
   }
@@ -260,18 +351,37 @@
     long offset;
 
     int packedIntsVersion;
-    int count;
+    long count;
     int blockSize;
   }
   
   static class BinaryEntry {
     long offset;
 
-    int count;
+    long count;
     int minLength;
     int maxLength;
     long addressesOffset;
     int packedIntsVersion;
     int blockSize;
   }
+  
+  // internally we compose complex dv (sorted/sortedset) from other ones
+  static abstract class LongNumericDocValues extends NumericDocValues {
+    @Override
+    public final long get(int docID) {
+      return get((long) docID);
+    }
+    
+    abstract long get(long id);
+  }
+  
+  static abstract class LongBinaryDocValues extends BinaryDocValues {
+    @Override
+    public final void get(int docID, BytesRef result) {
+      get((long)docID, result);
+    }
+    
+    abstract void get(long id, BytesRef Result);
+  }
 }
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesFormat.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesFormat.java
index d692705..02557c9 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesFormat.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesFormat.java
@@ -82,6 +82,31 @@
  *  so the "ord section" begins at startOffset + (9+pattern.length+maxlength)*numValues.
  *  a document's ord can be retrieved by seeking to "ord section" + (1+ordpattern.length())*docid
  *  an ord's value can be retrieved by seeking to startOffset + (9+pattern.length+maxlength)*ord
+ *  
+ *  for sorted set this is a fixed-width file very similar to the SORTED case, for example:
+ *  <pre>
+ *  field myField
+ *    type SORTED_SET
+ *    numvalues 10
+ *    maxLength 8
+ *    pattern 0
+ *    ordpattern XXXXX
+ *  length 6
+ *  foobar[space][space]
+ *  length 3
+ *  baz[space][space][space][space][space]
+ *  ...
+ *  0,3,5   
+ *  1,2
+ *  
+ *  10
+ *  ...
+ *  </pre>
+ *  so the "ord section" begins at startOffset + (9+pattern.length+maxlength)*numValues.
+ *  a document's ord list can be retrieved by seeking to "ord section" + (1+ordpattern.length())*docid
+ *  this is a comma-separated list, and its padded with spaces to be fixed width. so trim() and split() it.
+ *  and beware the empty string!
+ *  an ord's value can be retrieved by seeking to startOffset + (9+pattern.length+maxlength)*ord
  *   
  *  the reader can just scan this file when it opens, skipping over the data blocks
  *  and saving the offset/etc for each field. 
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java
index 62eeab9..31d4009 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java
@@ -36,6 +36,7 @@
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.FieldInfo.DocValuesType;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.StringHelper;
@@ -59,7 +60,7 @@
     int maxLength;
     boolean fixedLength;
     long minValue;
-    int numValues;
+    long numValues;
   };
 
   final int maxDoc;
@@ -109,10 +110,10 @@
         field.pattern = stripPrefix(PATTERN);
         field.dataStartFilePointer = data.getFilePointer();
         data.seek(data.getFilePointer() + (9+field.pattern.length()+field.maxLength) * maxDoc);
-      } else if (dvType == DocValuesType.SORTED) {
+      } else if (dvType == DocValuesType.SORTED || dvType == DocValuesType.SORTED_SET) {
         readLine();
         assert startsWith(NUMVALUES);
-        field.numValues = Integer.parseInt(stripPrefix(NUMVALUES));
+        field.numValues = Long.parseLong(stripPrefix(NUMVALUES));
         readLine();
         assert startsWith(MAXLENGTH);
         field.maxLength = Integer.parseInt(stripPrefix(MAXLENGTH));
@@ -279,6 +280,84 @@
 
       @Override
       public int getValueCount() {
+        return (int)field.numValues;
+      }
+    };
+  }
+
+  @Override
+  public SortedSetDocValues getSortedSet(FieldInfo fieldInfo) throws IOException {
+    final OneField field = fields.get(fieldInfo.name);
+
+    // SegmentCoreReaders already verifies this field is
+    // valid:
+    assert field != null;
+
+    final IndexInput in = data.clone();
+    final BytesRef scratch = new BytesRef();
+    final DecimalFormat decoder = new DecimalFormat(field.pattern, new DecimalFormatSymbols(Locale.ROOT));
+    
+    return new SortedSetDocValues() {
+      String[] currentOrds = new String[0];
+      int currentIndex = 0;
+      
+      @Override
+      public long nextOrd() {
+        if (currentIndex == currentOrds.length) {
+          return NO_MORE_ORDS;
+        } else {
+          return Long.parseLong(currentOrds[currentIndex++]);
+        }
+      }
+
+      @Override
+      public void setDocument(int docID) {
+        if (docID < 0 || docID >= maxDoc) {
+          throw new IndexOutOfBoundsException("docID must be 0 .. " + (maxDoc-1) + "; got " + docID);
+        }
+        try {
+          in.seek(field.dataStartFilePointer + field.numValues * (9 + field.pattern.length() + field.maxLength) + docID * (1 + field.ordPattern.length()));
+          SimpleTextUtil.readLine(in, scratch);
+          String ordList = scratch.utf8ToString().trim();
+          if (ordList.isEmpty()) {
+            currentOrds = new String[0];
+          } else {
+            currentOrds = ordList.split(",");
+          }
+          currentIndex = 0;
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe);
+        }
+      }
+
+      @Override
+      public void lookupOrd(long ord, BytesRef result) {
+        try {
+          if (ord < 0 || ord >= field.numValues) {
+            throw new IndexOutOfBoundsException("ord must be 0 .. " + (field.numValues-1) + "; got " + ord);
+          }
+          in.seek(field.dataStartFilePointer + ord * (9 + field.pattern.length() + field.maxLength));
+          SimpleTextUtil.readLine(in, scratch);
+          assert StringHelper.startsWith(scratch, LENGTH): "got " + scratch.utf8ToString() + " in=" + in;
+          int len;
+          try {
+            len = decoder.parse(new String(scratch.bytes, scratch.offset + LENGTH.length, scratch.length - LENGTH.length, "UTF-8")).intValue();
+          } catch (ParseException pe) {
+            CorruptIndexException e = new CorruptIndexException("failed to parse int length");
+            e.initCause(pe);
+            throw e;
+          }
+          result.bytes = new byte[len];
+          result.offset = 0;
+          result.length = len;
+          in.readBytes(result.bytes, 0, len);
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe);
+        }
+      }
+
+      @Override
+      public long getValueCount() {
         return field.numValues;
       }
     };
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java
index 7e59036..2f86255c 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java
@@ -22,6 +22,7 @@
 import java.text.DecimalFormat;
 import java.text.DecimalFormatSymbols;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Locale;
 import java.util.Set;
 
@@ -250,6 +251,115 @@
     }
   }
 
+  @Override
+  public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+    assert fieldSeen(field.name);
+    assert field.getDocValuesType() == DocValuesType.SORTED_SET;
+    writeFieldEntry(field, FieldInfo.DocValuesType.SORTED_SET);
+
+    long valueCount = 0;
+    int maxLength = 0;
+    for(BytesRef value : values) {
+      maxLength = Math.max(maxLength, value.length);
+      valueCount++;
+    }
+
+    // write numValues
+    SimpleTextUtil.write(data, NUMVALUES);
+    SimpleTextUtil.write(data, Long.toString(valueCount), scratch);
+    SimpleTextUtil.writeNewline(data);
+    
+    // write maxLength
+    SimpleTextUtil.write(data, MAXLENGTH);
+    SimpleTextUtil.write(data, Integer.toString(maxLength), scratch);
+    SimpleTextUtil.writeNewline(data);
+    
+    int maxBytesLength = Integer.toString(maxLength).length();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < maxBytesLength; i++) {
+      sb.append('0');
+    }
+    
+    // write our pattern for encoding lengths
+    SimpleTextUtil.write(data, PATTERN);
+    SimpleTextUtil.write(data, sb.toString(), scratch);
+    SimpleTextUtil.writeNewline(data);
+    final DecimalFormat encoder = new DecimalFormat(sb.toString(), new DecimalFormatSymbols(Locale.ROOT));
+    
+    // compute ord pattern: this is funny, we encode all values for all docs to find the maximum length
+    int maxOrdListLength = 0;
+    StringBuilder sb2 = new StringBuilder();
+    Iterator<Number> ordStream = ords.iterator();
+    for (Number n : docToOrdCount) {
+      sb2.setLength(0);
+      int count = n.intValue();
+      for (int i = 0; i < count; i++) {
+        long ord = ordStream.next().longValue();
+        if (sb2.length() > 0) {
+          sb2.append(",");
+        }
+        sb2.append(Long.toString(ord));
+      }
+      maxOrdListLength = Math.max(maxOrdListLength, sb2.length());
+    }
+     
+    sb2.setLength(0);
+    for (int i = 0; i < maxOrdListLength; i++) {
+      sb2.append('X');
+    }
+    
+    // write our pattern for ord lists
+    SimpleTextUtil.write(data, ORDPATTERN);
+    SimpleTextUtil.write(data, sb2.toString(), scratch);
+    SimpleTextUtil.writeNewline(data);
+    
+    // for asserts:
+    long valuesSeen = 0;
+
+    for(BytesRef value : values) {
+      // write length
+      SimpleTextUtil.write(data, LENGTH);
+      SimpleTextUtil.write(data, encoder.format(value.length), scratch);
+      SimpleTextUtil.writeNewline(data);
+        
+      // write bytes -- don't use SimpleText.write
+      // because it escapes:
+      data.writeBytes(value.bytes, value.offset, value.length);
+
+      // pad to fit
+      for (int i = value.length; i < maxLength; i++) {
+        data.writeByte((byte)' ');
+      }
+      SimpleTextUtil.writeNewline(data);
+      valuesSeen++;
+      assert valuesSeen <= valueCount;
+    }
+
+    assert valuesSeen == valueCount;
+
+    ordStream = ords.iterator();
+    
+    // write the ords for each doc comma-separated
+    for(Number n : docToOrdCount) {
+      sb2.setLength(0);
+      int count = n.intValue();
+      for (int i = 0; i < count; i++) {
+        long ord = ordStream.next().longValue();
+        if (sb2.length() > 0) {
+          sb2.append(",");
+        }
+        sb2.append(Long.toString(ord));
+      }
+      // now pad to fit: these are numbers so spaces work well. reader calls trim()
+      int numPadding = maxOrdListLength - sb2.length();
+      for (int i = 0; i < numPadding; i++) {
+        sb2.append(' ');
+      }
+      SimpleTextUtil.write(data, sb2.toString(), scratch);
+      SimpleTextUtil.writeNewline(data);
+    }
+  }
+
   /** write the header for this field */
   private void writeFieldEntry(FieldInfo field, FieldInfo.DocValuesType type) throws IOException {
     SimpleTextUtil.write(data, FIELD);
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/DocValuesConsumer.java b/lucene/core/src/java/org/apache/lucene/codecs/DocValuesConsumer.java
index 6780626..1db957b 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/DocValuesConsumer.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/DocValuesConsumer.java
@@ -19,7 +19,6 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -27,15 +26,20 @@
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.BinaryDocValues;
 import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FilteredTermsEnum;
 import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.MultiDocValues.OrdinalMap;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentWriteState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedDocValuesTermsEnum;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.index.SortedSetDocValuesTermsEnum;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.FixedBitSet;
-import org.apache.lucene.util.PriorityQueue;
-import org.apache.lucene.util.packed.AppendingLongBuffer;
+import org.apache.lucene.util.OpenBitSet;
 
 /** 
  * Abstract API that consumes numeric, binary and
@@ -90,6 +94,16 @@
   public abstract void addSortedField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrd) throws IOException;
 
   /**
+   * Writes pre-sorted set docvalues for a field
+   * @param field field information
+   * @param values Iterable of binary values in sorted order (deduplicated).
+   * @param docToOrdCount Iterable of the number of values for each document. 
+   * @param ords Iterable of ordinal occurrences (docToOrdCount*maxDoc total).
+   * @throws IOException if an I/O error occurred.
+   */
+  public abstract void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException;
+  
+  /**
    * Merges the numeric docvalues from <code>toMerge</code>.
    * <p>
    * The default implementation calls {@link #addNumericField}, passing
@@ -237,134 +251,6 @@
                    });
   }
 
-  static class SortedBytesMerger {
-
-    public int numMergedTerms;
-
-    final AppendingLongBuffer ordToReaderId = new AppendingLongBuffer();
-    final List<SegmentState> segStates = new ArrayList<SegmentState>();
-
-    private static class SegmentState {
-      int segmentID;
-      AtomicReader reader;
-      FixedBitSet liveTerms;
-      int ord = -1;
-      SortedDocValues values;
-      BytesRef scratch = new BytesRef();
-      AppendingLongBuffer ordDeltas = new AppendingLongBuffer();
-
-      // TODO: use another scheme?
-      // currently we +/- delta merged-ord from segment-ord (is this good? makes sense to me?)
-      // but we have a good idea "roughly" what
-      // the ord should be (linear projection) so we only
-      // need to encode the delta from that ...:        
-      AppendingLongBuffer segOrdToMergedOrd = new AppendingLongBuffer();
-
-      public BytesRef nextTerm() {
-        while (ord < values.getValueCount()-1) {
-          ord++;
-          if (liveTerms == null || liveTerms.get(ord)) {
-            values.lookupOrd(ord, scratch);
-            return scratch;
-          }
-        }
-
-        return null;
-      }
-    }
-
-    private static class TermMergeQueue extends PriorityQueue<SegmentState> {
-      public TermMergeQueue(int maxSize) {
-        super(maxSize);
-      }
-
-      @Override
-      protected boolean lessThan(SegmentState a, SegmentState b) {
-        return a.scratch.compareTo(b.scratch) <= 0;
-      }
-    }
-
-    public void merge(MergeState mergeState, List<SortedDocValues> toMerge) throws IOException {
-
-      // First pass: mark "live" terms
-      for (int readerIDX=0;readerIDX<toMerge.size();readerIDX++) {
-        AtomicReader reader = mergeState.readers.get(readerIDX);      
-        int maxDoc = reader.maxDoc();
-
-        SegmentState state = new SegmentState();
-        state.segmentID = readerIDX;
-        state.reader = reader;
-        state.values = toMerge.get(readerIDX);
-
-        segStates.add(state);
-        assert state.values.getValueCount() < Integer.MAX_VALUE;
-        if (reader.hasDeletions()) {
-          state.liveTerms = new FixedBitSet(state.values.getValueCount());
-          Bits liveDocs = reader.getLiveDocs();
-          assert liveDocs != null;
-          for(int docID=0;docID<maxDoc;docID++) {
-            if (liveDocs.get(docID)) {
-              state.liveTerms.set(state.values.getOrd(docID));
-            }
-          }
-        }
-
-        // TODO: we can unload the bits/packed ints to disk to reduce
-        // transient ram spike... most of these just require iterators
-      }
-
-      // Second pass: merge only the live terms
-
-      TermMergeQueue q = new TermMergeQueue(segStates.size());
-      for(SegmentState segState : segStates) {
-        if (segState.nextTerm() != null) {
-          q.add(segState);
-        }
-      }
-
-      int lastOrds[] = new int[segStates.size()];
-      BytesRef lastTerm = null;
-      int ord = 0;
-      while (q.size() != 0) {
-        SegmentState top = q.top();
-        if (lastTerm == null || !lastTerm.equals(top.scratch)) {
-          // a new unique term: record its segment ID / sourceOrd pair
-          int readerId = top.segmentID;
-          ordToReaderId.add(readerId);
-
-          int sourceOrd = top.ord;             
-          int delta = sourceOrd - lastOrds[readerId];
-          lastOrds[readerId] = sourceOrd;
-          top.ordDeltas.add(delta);
-          
-          if (lastTerm == null) {
-            lastTerm = BytesRef.deepCopyOf(top.scratch);
-          } else {
-            lastTerm.copyBytes(top.scratch);
-          }
-          ord++;
-        }
-
-        long signedDelta = (ord-1) - top.ord; // global ord space - segment ord space
-        // fill in any holes for unused ords, then finally the value we want (segOrdToMergedOrd[top.ord])
-        // TODO: is there a better way...
-        while (top.segOrdToMergedOrd.size() <= top.ord) {
-          top.segOrdToMergedOrd.add(signedDelta);
-        }
-        if (top.nextTerm() == null) {
-          q.pop();
-        } else {
-          q.updateTop();
-        }
-      }
-
-      numMergedTerms = ord;
-      // clear our bitsets for GC: we dont need them anymore (e.g. while flushing merged stuff to codec)
-      for (SegmentState state : segStates) {
-        state.liveTerms = null;
-      }
-    }
-  }
 
   /**
    * Merges the sorted docvalues from <code>toMerge</code>.
@@ -373,121 +259,370 @@
    * an Iterable that merges ordinals and values and filters deleted documents .
    */
   public void mergeSortedField(FieldInfo fieldInfo, final MergeState mergeState, List<SortedDocValues> toMerge) throws IOException {
-    final SortedBytesMerger merger = new SortedBytesMerger();
-
-    // Does the heavy lifting to merge sort all "live" ords:
-    merger.merge(mergeState, toMerge);
-
+    final AtomicReader readers[] = mergeState.readers.toArray(new AtomicReader[toMerge.size()]);
+    final SortedDocValues dvs[] = toMerge.toArray(new SortedDocValues[toMerge.size()]);
+    
+    // step 1: iterate thru each sub and mark terms still in use
+    TermsEnum liveTerms[] = new TermsEnum[dvs.length];
+    for (int sub = 0; sub < liveTerms.length; sub++) {
+      AtomicReader reader = readers[sub];
+      SortedDocValues dv = dvs[sub];
+      Bits liveDocs = reader.getLiveDocs();
+      if (liveDocs == null) {
+        liveTerms[sub] = new SortedDocValuesTermsEnum(dv);
+      } else {
+        OpenBitSet bitset = new OpenBitSet(dv.getValueCount());
+        for (int i = 0; i < reader.maxDoc(); i++) {
+          if (liveDocs.get(i)) {
+            bitset.set(dv.getOrd(i));
+          }
+        }
+        liveTerms[sub] = new BitsFilteredTermsEnum(new SortedDocValuesTermsEnum(dv), bitset);
+      }
+    }
+    
+    // step 2: create ordinal map (this conceptually does the "merging")
+    final OrdinalMap map = new OrdinalMap(this, liveTerms);
+    
+    // step 3: add field
     addSortedField(fieldInfo,
+        // ord -> value
+        new Iterable<BytesRef>() {
+          @Override
+          public Iterator<BytesRef> iterator() {
+            return new Iterator<BytesRef>() {
+              final BytesRef scratch = new BytesRef();
+              int currentOrd;
 
-                   // ord -> value
-                   new Iterable<BytesRef>() {
-                     @Override
-                     public Iterator<BytesRef> iterator() {
-                       // for each next(), tells us what reader to go to
-                       final AppendingLongBuffer.Iterator readerIDs = merger.ordToReaderId.iterator();
-                       // for each next(), gives us the original ord
-                       final AppendingLongBuffer.Iterator ordDeltas[] = new AppendingLongBuffer.Iterator[merger.segStates.size()];
-                       final int lastOrds[] = new int[ordDeltas.length];
-                       
-                       for (int i = 0; i < ordDeltas.length; i++) {
-                         ordDeltas[i] = merger.segStates.get(i).ordDeltas.iterator();
-                       }
+              @Override
+              public boolean hasNext() {
+                return currentOrd < map.getValueCount();
+              }
 
-                       final BytesRef scratch = new BytesRef();
-                       
-                       return new Iterator<BytesRef>() {
-                         int ordUpto;
+              @Override
+              public BytesRef next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
+                int segmentNumber = map.getSegmentNumber(currentOrd);
+                int segmentOrd = (int)map.getSegmentOrd(segmentNumber, currentOrd);
+                dvs[segmentNumber].lookupOrd(segmentOrd, scratch);
+                currentOrd++;
+                return scratch;
+              }
 
-                         @Override
-                         public boolean hasNext() {
-                           return ordUpto < merger.numMergedTerms;
-                         }
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        },
+        // doc -> ord
+        new Iterable<Number>() {
+          @Override
+          public Iterator<Number> iterator() {
+            return new Iterator<Number>() {
+              int readerUpto = -1;
+              int docIDUpto;
+              int nextValue;
+              AtomicReader currentReader;
+              Bits currentLiveDocs;
+              boolean nextIsSet;
 
-                         @Override
-                         public void remove() {
-                           throw new UnsupportedOperationException();
-                         }
+              @Override
+              public boolean hasNext() {
+                return nextIsSet || setNext();
+              }
 
-                         @Override
-                         public BytesRef next() {
-                           if (!hasNext()) {
-                             throw new NoSuchElementException();
-                           }
-                           int readerID = (int) readerIDs.next();
-                           int ord = lastOrds[readerID] + (int) ordDeltas[readerID].next();
-                           merger.segStates.get(readerID).values.lookupOrd(ord, scratch);
-                           lastOrds[readerID] = ord;
-                           ordUpto++;
-                           return scratch;
-                         }
-                       };
-                     }
-                   },
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
 
-                   // doc -> ord
-                    new Iterable<Number>() {
-                      @Override
-                      public Iterator<Number> iterator() {
-                        return new Iterator<Number>() {
-                          int readerUpto = -1;
-                          int docIDUpto;
-                          int nextValue;
-                          SortedBytesMerger.SegmentState currentReader;
-                          Bits currentLiveDocs;
-                          boolean nextIsSet;
+              @Override
+              public Number next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
+                assert nextIsSet;
+                nextIsSet = false;
+                // TODO make a mutable number
+                return nextValue;
+              }
 
-                          @Override
-                          public boolean hasNext() {
-                            return nextIsSet || setNext();
-                          }
+              private boolean setNext() {
+                while (true) {
+                  if (readerUpto == readers.length) {
+                    return false;
+                  }
 
-                          @Override
-                          public void remove() {
-                            throw new UnsupportedOperationException();
-                          }
+                  if (currentReader == null || docIDUpto == currentReader.maxDoc()) {
+                    readerUpto++;
+                    if (readerUpto < readers.length) {
+                      currentReader = readers[readerUpto];
+                      currentLiveDocs = currentReader.getLiveDocs();
+                    }
+                    docIDUpto = 0;
+                    continue;
+                  }
 
-                          @Override
-                          public Number next() {
-                            if (!hasNext()) {
-                              throw new NoSuchElementException();
-                            }
-                            assert nextIsSet;
-                            nextIsSet = false;
-                            // TODO make a mutable number
-                            return nextValue;
-                          }
+                  if (currentLiveDocs == null || currentLiveDocs.get(docIDUpto)) {
+                    nextIsSet = true;
+                    int segOrd = dvs[readerUpto].getOrd(docIDUpto);
+                    nextValue = (int) map.getGlobalOrd(readerUpto, segOrd);
+                    docIDUpto++;
+                    return true;
+                  }
 
-                          private boolean setNext() {
-                            while (true) {
-                              if (readerUpto == merger.segStates.size()) {
-                                return false;
-                              }
+                  docIDUpto++;
+                }
+              }
+            };
+          }
+        }
+    );
+  }
+  
+  /**
+   * Merges the sortedset docvalues from <code>toMerge</code>.
+   * <p>
+   * The default implementation calls {@link #addSortedSetField}, passing
+   * an Iterable that merges ordinals and values and filters deleted documents .
+   */
+  public void mergeSortedSetField(FieldInfo fieldInfo, final MergeState mergeState, List<SortedSetDocValues> toMerge) throws IOException {
+    final AtomicReader readers[] = mergeState.readers.toArray(new AtomicReader[toMerge.size()]);
+    final SortedSetDocValues dvs[] = toMerge.toArray(new SortedSetDocValues[toMerge.size()]);
+    
+    // step 1: iterate thru each sub and mark terms still in use
+    TermsEnum liveTerms[] = new TermsEnum[dvs.length];
+    for (int sub = 0; sub < liveTerms.length; sub++) {
+      AtomicReader reader = readers[sub];
+      SortedSetDocValues dv = dvs[sub];
+      Bits liveDocs = reader.getLiveDocs();
+      if (liveDocs == null) {
+        liveTerms[sub] = new SortedSetDocValuesTermsEnum(dv);
+      } else {
+        OpenBitSet bitset = new OpenBitSet(dv.getValueCount());
+        for (int i = 0; i < reader.maxDoc(); i++) {
+          if (liveDocs.get(i)) {
+            dv.setDocument(i);
+            long ord;
+            while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+              bitset.set(ord);
+            }
+          }
+        }
+        liveTerms[sub] = new BitsFilteredTermsEnum(new SortedSetDocValuesTermsEnum(dv), bitset);
+      }
+    }
+    
+    // step 2: create ordinal map (this conceptually does the "merging")
+    final OrdinalMap map = new OrdinalMap(this, liveTerms);
+    
+    // step 3: add field
+    addSortedSetField(fieldInfo,
+        // ord -> value
+        new Iterable<BytesRef>() {
+          @Override
+          public Iterator<BytesRef> iterator() {
+            return new Iterator<BytesRef>() {
+              final BytesRef scratch = new BytesRef();
+              long currentOrd;
 
-                              if (currentReader == null || docIDUpto == currentReader.reader.maxDoc()) {
-                                readerUpto++;
-                                if (readerUpto < merger.segStates.size()) {
-                                  currentReader = merger.segStates.get(readerUpto);
-                                  currentLiveDocs = currentReader.reader.getLiveDocs();
-                                }
-                                docIDUpto = 0;
-                                continue;
-                              }
+              @Override
+              public boolean hasNext() {
+                return currentOrd < map.getValueCount();
+              }
 
-                              if (currentLiveDocs == null || currentLiveDocs.get(docIDUpto)) {
-                                nextIsSet = true;
-                                int segOrd = currentReader.values.getOrd(docIDUpto);
-                                nextValue = (int) (segOrd + currentReader.segOrdToMergedOrd.get(segOrd));
-                                docIDUpto++;
-                                return true;
-                              }
+              @Override
+              public BytesRef next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
+                int segmentNumber = map.getSegmentNumber(currentOrd);
+                long segmentOrd = map.getSegmentOrd(segmentNumber, currentOrd);
+                dvs[segmentNumber].lookupOrd(segmentOrd, scratch);
+                currentOrd++;
+                return scratch;
+              }
 
-                              docIDUpto++;
-                            }
-                          }
-                        };
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        },
+        // doc -> ord count
+        new Iterable<Number>() {
+          @Override
+          public Iterator<Number> iterator() {
+            return new Iterator<Number>() {
+              int readerUpto = -1;
+              int docIDUpto;
+              int nextValue;
+              AtomicReader currentReader;
+              Bits currentLiveDocs;
+              boolean nextIsSet;
+
+              @Override
+              public boolean hasNext() {
+                return nextIsSet || setNext();
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public Number next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
+                assert nextIsSet;
+                nextIsSet = false;
+                // TODO make a mutable number
+                return nextValue;
+              }
+
+              private boolean setNext() {
+                while (true) {
+                  if (readerUpto == readers.length) {
+                    return false;
+                  }
+
+                  if (currentReader == null || docIDUpto == currentReader.maxDoc()) {
+                    readerUpto++;
+                    if (readerUpto < readers.length) {
+                      currentReader = readers[readerUpto];
+                      currentLiveDocs = currentReader.getLiveDocs();
+                    }
+                    docIDUpto = 0;
+                    continue;
+                  }
+
+                  if (currentLiveDocs == null || currentLiveDocs.get(docIDUpto)) {
+                    nextIsSet = true;
+                    SortedSetDocValues dv = dvs[readerUpto];
+                    dv.setDocument(docIDUpto);
+                    nextValue = 0;
+                    while (dv.nextOrd() != SortedSetDocValues.NO_MORE_ORDS) {
+                      nextValue++;
+                    }
+                    docIDUpto++;
+                    return true;
+                  }
+
+                  docIDUpto++;
+                }
+              }
+            };
+          }
+        },
+        // ords
+        new Iterable<Number>() {
+          @Override
+          public Iterator<Number> iterator() {
+            return new Iterator<Number>() {
+              int readerUpto = -1;
+              int docIDUpto;
+              long nextValue;
+              AtomicReader currentReader;
+              Bits currentLiveDocs;
+              boolean nextIsSet;
+              long ords[] = new long[8];
+              int ordUpto;
+              int ordLength;
+
+              @Override
+              public boolean hasNext() {
+                return nextIsSet || setNext();
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public Number next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
+                assert nextIsSet;
+                nextIsSet = false;
+                // TODO make a mutable number
+                return nextValue;
+              }
+
+              private boolean setNext() {
+                while (true) {
+                  if (readerUpto == readers.length) {
+                    return false;
+                  }
+                  
+                  if (ordUpto < ordLength) {
+                    nextValue = ords[ordUpto];
+                    ordUpto++;
+                    nextIsSet = true;
+                    return true;
+                  }
+
+                  if (currentReader == null || docIDUpto == currentReader.maxDoc()) {
+                    readerUpto++;
+                    if (readerUpto < readers.length) {
+                      currentReader = readers[readerUpto];
+                      currentLiveDocs = currentReader.getLiveDocs();
+                    }
+                    docIDUpto = 0;
+                    continue;
+                  }
+                  
+                  if (currentLiveDocs == null || currentLiveDocs.get(docIDUpto)) {
+                    assert docIDUpto < currentReader.maxDoc();
+                    SortedSetDocValues dv = dvs[readerUpto];
+                    dv.setDocument(docIDUpto);
+                    ordUpto = ordLength = 0;
+                    long ord;
+                    while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+                      if (ordLength == ords.length) {
+                        ords = ArrayUtil.grow(ords, ordLength+1);
                       }
-                    });
+                      ords[ordLength] = map.getGlobalOrd(readerUpto, ord);
+                      ordLength++;
+                    }
+                    docIDUpto++;
+                    continue;
+                  }
 
+                  docIDUpto++;
+                }
+              }
+            };
+          }
+        }
+     );
+  }
+  
+  // TODO: seek-by-ord to nextSetBit
+  static class BitsFilteredTermsEnum extends FilteredTermsEnum {
+    final OpenBitSet liveTerms;
+    
+    BitsFilteredTermsEnum(TermsEnum in, OpenBitSet liveTerms) {
+      super(in, false); // <-- not passing false here wasted about 3 hours of my time!!!!!!!!!!!!!
+      assert liveTerms != null;
+      this.liveTerms = liveTerms;
+    }
+
+    @Override
+    protected AcceptStatus accept(BytesRef term) throws IOException {
+      if (liveTerms.get(ord())) {
+        return AcceptStatus.YES;
+      } else {
+        return AcceptStatus.NO;
+      }
+    }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/DocValuesProducer.java b/lucene/core/src/java/org/apache/lucene/codecs/DocValuesProducer.java
index d34c51d..b2c5d54 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/DocValuesProducer.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/DocValuesProducer.java
@@ -24,6 +24,7 @@
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 
 /** Abstract API that produces numeric, binary and
  * sorted docvalues.
@@ -50,4 +51,9 @@
    *  The returned instance need not be thread-safe: it will only be
    *  used by a single thread. */
   public abstract SortedDocValues getSorted(FieldInfo field) throws IOException;
+  
+  /** Returns {@link SortedSetDocValues} for this field.
+   *  The returned instance need not be thread-safe: it will only be
+   *  used by a single thread. */
+  public abstract SortedSetDocValues getSortedSet(FieldInfo field) throws IOException;
 }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesReader.java
index bd61680..21a082c 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesReader.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesReader.java
@@ -31,6 +31,7 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.store.CompoundFileDirectory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
@@ -615,6 +616,11 @@
   }
   
   @Override
+  public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+    throw new IllegalStateException("Lucene 4.0 does not support SortedSet: how did you pull this off?");
+  }
+
+  @Override
   public void close() throws IOException {
     dir.close();
   }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene41/package.html b/lucene/core/src/java/org/apache/lucene/codecs/lucene41/package.html
index 6a8a5b1..3df0293 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene41/package.html
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene41/package.html
@@ -375,7 +375,8 @@
 term vectors.</li>
 <li>In version 4.1, the format of the postings list changed to use either
 of FOR compression or variable-byte encoding, depending upon the frequency
-of the term.</li>
+of the term. Terms appearing only once were changed to inline directly into
+the term dictionary. Stored fields are compressed by default. </li>
 </ul>
 <a name="Limitations" id="Limitations"></a>
 <h2>Limitations</h2>
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesConsumer.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesConsumer.java
index 1512bdf..4b33a39 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesConsumer.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesConsumer.java
@@ -20,13 +20,17 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.DocValuesConsumer;
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.store.ByteArrayDataOutput;
 import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.IntsRef;
@@ -195,13 +199,8 @@
       writer.finish();
     }
   }
-
-  @Override
-  public void addSortedField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrd) throws IOException {
-    // write the ordinals as numerics
-    addNumericField(field, docToOrd);
-    
-    // write the values as FST
+  
+  private void writeFST(FieldInfo field, Iterable<BytesRef> values) throws IOException {
     meta.writeVInt(field.number);
     meta.writeByte(FST);
     meta.writeLong(data.getFilePointer());
@@ -214,7 +213,94 @@
       ord++;
     }
     FST<Long> fst = builder.finish();
-    fst.save(data);
-    meta.writeVInt((int)ord);
+    if (fst != null) {
+      fst.save(data);
+    }
+    meta.writeVLong(ord);
+  }
+
+  @Override
+  public void addSortedField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrd) throws IOException {
+    // write the ordinals as numerics
+    addNumericField(field, docToOrd);
+    
+    // write the values as FST
+    writeFST(field, values);
+  }
+
+  // note: this might not be the most efficient... but its fairly simple
+  @Override
+  public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, final Iterable<Number> docToOrdCount, final Iterable<Number> ords) throws IOException {
+    // write the ordinals as a binary field
+    addBinaryField(field, new Iterable<BytesRef>() {
+      @Override
+      public Iterator<BytesRef> iterator() {
+        return new SortedSetIterator(docToOrdCount.iterator(), ords.iterator());
+      }
+    });
+      
+    // write the values as FST
+    writeFST(field, values);
+  }
+  
+  // per-document vint-encoded byte[]
+  static class SortedSetIterator implements Iterator<BytesRef> {
+    byte[] buffer = new byte[10];
+    ByteArrayDataOutput out = new ByteArrayDataOutput();
+    BytesRef ref = new BytesRef();
+    
+    final Iterator<Number> counts;
+    final Iterator<Number> ords;
+    
+    SortedSetIterator(Iterator<Number> counts, Iterator<Number> ords) {
+      this.counts = counts;
+      this.ords = ords;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return counts.hasNext();
+    }
+
+    @Override
+    public BytesRef next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      
+      int count = counts.next().intValue();
+      int maxSize = count*9; // worst case
+      if (maxSize > buffer.length) {
+        buffer = ArrayUtil.grow(buffer, maxSize);
+      }
+      
+      try {
+        encodeValues(count);
+      } catch (IOException bogus) {
+        throw new RuntimeException(bogus);
+      }
+      
+      ref.bytes = buffer;
+      ref.offset = 0;
+      ref.length = out.getPosition();
+
+      return ref;
+    }
+    
+    // encodes count values to buffer
+    private void encodeValues(int count) throws IOException {
+      out.reset(buffer);
+      long lastOrd = 0;
+      for (int i = 0; i < count; i++) {
+        long ord = ords.next().longValue();
+        out.writeVLong(ord - lastOrd);
+        lastOrd = ord;
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesFormat.java
index 640218c..1cef435 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesFormat.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesFormat.java
@@ -34,7 +34,7 @@
 /**
  * Lucene 4.2 DocValues format.
  * <p>
- * Encodes the three per-document value types (Numeric,Binary,Sorted) with five basic strategies.
+ * Encodes the four per-document value types (Numeric,Binary,Sorted,SortedSet) with seven basic strategies.
  * <p>
  * <ul>
  *    <li>Delta-compressed Numerics: per-document integers written in blocks of 4096. For each block
@@ -51,7 +51,9 @@
  *        start for the block, and the average (expected) delta per entry. For each document the 
  *        deviation from the delta (actual - expected) is written.
  *    <li>Sorted: an FST mapping deduplicated terms to ordinals is written, along with the per-document
- *        ordinals written using one of the numeric stratgies above.
+ *        ordinals written using one of the numeric strategies above.
+ *    <li>SortedSet: an FST mapping deduplicated terms to ordinals is written, along with the per-document
+ *        ordinal list written using one of the binary strategies above.  
  * </ul>
  * <p>
  * Files:
@@ -77,6 +79,8 @@
  *   </ul>
  *   <p>Sorted fields have two entries: a SortedEntry with the FST metadata,
  *      and an ordinary NumericEntry for the document-to-ord metadata.</p>
+ *   <p>SortedSet fields have two entries: a SortedEntry with the FST metadata,
+ *      and an ordinary BinaryEntry for the document-to-ord-list metadata.</p>
  *   <p>FieldNumber of -1 indicates the end of metadata.</p>
  *   <p>EntryType is a 0 (NumericEntry), 1 (BinaryEntry, or 2 (SortedEntry)</p>
  *   <p>DataOffset is the pointer to the start of the data in the DocValues data (.dvd)</p>
@@ -107,6 +111,8 @@
  *     <li>UncompressedNumerics --&gt; {@link DataOutput#writeByte Byte}<sup>maxdoc</sup></li>
  *     <li>Addresses --&gt; {@link MonotonicBlockPackedWriter MonotonicBlockPackedInts(blockSize=4096)}</li>
  *   </ul>
+ *   <p>SortedSet entries store the list of ordinals in their BinaryData as a
+ *      sequences of increasing {@link DataOutput#writeVLong vLong}s, delta-encoded.</p>       
  * </ol>
  */
 public final class Lucene42DocValuesFormat extends DocValuesFormat {
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesProducer.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesProducer.java
index 38f3c2e..a9463b8 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesProducer.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42DocValuesProducer.java
@@ -31,6 +31,8 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.store.ByteArrayDataInput;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
@@ -123,7 +125,7 @@
       } else if (fieldType == Lucene42DocValuesConsumer.FST) {
         FSTEntry entry = new FSTEntry();
         entry.offset = meta.readLong();
-        entry.numOrds = meta.readVInt();
+        entry.numOrds = meta.readVLong();
         fsts.put(fieldNumber, entry);
       } else {
         throw new CorruptIndexException("invalid entry type: " + fieldType + ", input=" + meta);
@@ -281,12 +283,96 @@
 
       @Override
       public int getValueCount() {
-        return entry.numOrds;
+        return (int)entry.numOrds;
       }
     };
   }
   
   @Override
+  public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+    final FSTEntry entry = fsts.get(field.number);
+    if (entry.numOrds == 0) {
+      return SortedSetDocValues.EMPTY; // empty FST!
+    }
+    FST<Long> instance;
+    synchronized(this) {
+      instance = fstInstances.get(field.number);
+      if (instance == null) {
+        data.seek(entry.offset);
+        instance = new FST<Long>(data, PositiveIntOutputs.getSingleton(true));
+        fstInstances.put(field.number, instance);
+      }
+    }
+    final BinaryDocValues docToOrds = getBinary(field);
+    final FST<Long> fst = instance;
+    
+    // per-thread resources
+    final BytesReader in = fst.getBytesReader();
+    final Arc<Long> firstArc = new Arc<Long>();
+    final Arc<Long> scratchArc = new Arc<Long>();
+    final IntsRef scratchInts = new IntsRef();
+    final BytesRefFSTEnum<Long> fstEnum = new BytesRefFSTEnum<Long>(fst); 
+    final BytesRef ref = new BytesRef();
+    final ByteArrayDataInput input = new ByteArrayDataInput();
+    return new SortedSetDocValues() {
+      long currentOrd;
+
+      @Override
+      public long nextOrd() {
+        if (input.eof()) {
+          return NO_MORE_ORDS;
+        } else {
+          currentOrd += input.readVLong();
+          return currentOrd;
+        }
+      }
+      
+      @Override
+      public void setDocument(int docID) {
+        docToOrds.get(docID, ref);
+        input.reset(ref.bytes, ref.offset, ref.length);
+        currentOrd = 0;
+      }
+
+      @Override
+      public void lookupOrd(long ord, BytesRef result) {
+        try {
+          in.setPosition(0);
+          fst.getFirstArc(firstArc);
+          IntsRef output = Util.getByOutput(fst, ord, in, firstArc, scratchArc, scratchInts);
+          result.bytes = new byte[output.length];
+          result.offset = 0;
+          result.length = 0;
+          Util.toBytesRef(output, result);
+        } catch (IOException bogus) {
+          throw new RuntimeException(bogus);
+        }
+      }
+
+      @Override
+      public long lookupTerm(BytesRef key) {
+        try {
+          InputOutput<Long> o = fstEnum.seekCeil(key);
+          if (o == null) {
+            return -getValueCount()-1;
+          } else if (o.input.equals(key)) {
+            return o.output.intValue();
+          } else {
+            return -o.output-1;
+          }
+        } catch (IOException bogus) {
+          throw new RuntimeException(bogus);
+        }
+      }
+
+      @Override
+      public long getValueCount() {
+        return entry.numOrds;
+      }
+    };
+  }
+
+  @Override
   public void close() throws IOException {
     data.close();
   }
@@ -308,6 +394,6 @@
   
   static class FSTEntry {
     long offset;
-    int numOrds;
+    long numOrds;
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosReader.java
index c86eeae..1beee5a 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosReader.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosReader.java
@@ -114,6 +114,8 @@
       return DocValuesType.BINARY;
     } else if (b == 3) {
       return DocValuesType.SORTED;
+    } else if (b == 4) {
+      return DocValuesType.SORTED_SET;
     } else {
       throw new CorruptIndexException("invalid docvalues byte: " + b + " (resource=" + input + ")");
     }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosWriter.java
index 80baa07..ab10ba0 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/Lucene42FieldInfosWriter.java
@@ -99,6 +99,8 @@
       return 2;
     } else if (type == DocValuesType.SORTED) {
       return 3;
+    } else if (type == DocValuesType.SORTED_SET) {
+      return 4;
     } else {
       throw new AssertionError();
     }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/package.html b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/package.html
index 6cca5e8..9ed17df 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene42/package.html
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene42/package.html
@@ -375,7 +375,11 @@
 term vectors.</li>
 <li>In version 4.1, the format of the postings list changed to use either
 of FOR compression or variable-byte encoding, depending upon the frequency
-of the term.</li>
+of the term. Terms appearing only once were changed to inline directly into
+the term dictionary. Stored fields are compressed by default. </li>
+<li>In version 4.2, term vectors are compressed by default. DocValues has 
+a new multi-valued type (SortedSet), that can be used for faceting/grouping/joining
+on multi-valued fields.</li>
 </ul>
 <a name="Limitations" id="Limitations"></a>
 <h2>Limitations</h2>
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java
index 52f8a4b..72053a8 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java
@@ -35,6 +35,7 @@
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SegmentWriteState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 
@@ -114,6 +115,11 @@
       getInstance(field).addSortedField(field, values, docToOrd);
     }
 
+    @Override
+    public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+      getInstance(field).addSortedSetField(field, values, docToOrdCount, ords);
+    }
+
     private DocValuesConsumer getInstance(FieldInfo field) throws IOException {
       final DocValuesFormat format = getDocValuesFormatForField(field.name);
       if (format == null) {
@@ -255,6 +261,12 @@
     }
 
     @Override
+    public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+      DocValuesProducer producer = fields.get(field.name);
+      return producer == null ? null : producer.getSortedSet(field);
+    }
+
+    @Override
     public void close() throws IOException {
       IOUtils.close(formats.values());
     }
diff --git a/lucene/core/src/java/org/apache/lucene/document/SortedSetDocValuesField.java b/lucene/core/src/java/org/apache/lucene/document/SortedSetDocValuesField.java
new file mode 100644
index 0000000..5b855ad
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/document/SortedSetDocValuesField.java
@@ -0,0 +1,61 @@
+package org.apache.lucene.document;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.util.BytesRef;
+
+/**
+ * <p>
+ * Field that stores
+ * a set of per-document {@link BytesRef} values, indexed for
+ * faceting,grouping,joining.  Here's an example usage:
+ * 
+ * <pre class="prettyprint">
+ *   document.add(new SortedSetDocValuesField(name, new BytesRef("hello")));
+ *   document.add(new SortedSetDocValuesField(name, new BytesRef("world")));
+ * </pre>
+ * 
+ * <p>
+ * If you also need to store the value, you should add a
+ * separate {@link StoredField} instance.
+ * 
+ * */
+
+public class SortedSetDocValuesField extends StoredField {
+
+  /**
+   * Type for sorted bytes DocValues
+   */
+  public static final FieldType TYPE = new FieldType();
+  static {
+    TYPE.setDocValueType(FieldInfo.DocValuesType.SORTED_SET);
+    TYPE.freeze();
+  }
+
+  /**
+   * Create a new sorted DocValues field.
+   * @param name field name
+   * @param bytes binary content
+   * @throws IllegalArgumentException if the field name is null
+   */
+  public SortedSetDocValuesField(String name, BytesRef bytes) {
+    super(name, TYPE);
+    fieldsData = bytes;
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/AtomicReader.java b/lucene/core/src/java/org/apache/lucene/index/AtomicReader.java
index e545df4..cfc7359 100644
--- a/lucene/core/src/java/org/apache/lucene/index/AtomicReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/AtomicReader.java
@@ -174,6 +174,12 @@
    *  this field.  The returned instance should only be
    *  used by a single thread. */
   public abstract SortedDocValues getSortedDocValues(String field) throws IOException;
+  
+  /** Returns {@link SortedSetDocValues} for this field, or
+   *  null if no {@link SortedSetDocValues} were indexed for
+   *  this field.  The returned instance should only be
+   *  used by a single thread. */
+  public abstract SortedSetDocValues getSortedSetDocValues(String field) throws IOException;
 
   /** Returns {@link NumericDocValues} representing norms
    *  for this field, or null if no {@link NumericDocValues}
diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesWriter.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesWriter.java
index 178ca1f..e9cc2fa 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesWriter.java
@@ -91,7 +91,7 @@
   private class BytesIterator implements Iterator<BytesRef> {
     final BytesRef value = new BytesRef();
     final AppendingLongBuffer.Iterator lengthsIterator = lengths.iterator();
-    final int size = lengths.size();
+    final int size = (int) lengths.size();
     final int maxDoc;
     int upto;
     long byteOffset;
diff --git a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
index f151cc5..0edadfc 100644
--- a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
+++ b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
@@ -42,6 +42,7 @@
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CommandLineUtil;
 import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.OpenBitSet;
 import org.apache.lucene.util.StringHelper;
 
 /**
@@ -1275,7 +1276,8 @@
         } else {
           if (reader.getBinaryDocValues(fieldInfo.name) != null ||
               reader.getNumericDocValues(fieldInfo.name) != null ||
-              reader.getSortedDocValues(fieldInfo.name) != null) {
+              reader.getSortedDocValues(fieldInfo.name) != null || 
+              reader.getSortedSetDocValues(fieldInfo.name) != null) {
             throw new RuntimeException("field: " + fieldInfo.name + " has docvalues but should omit them!");
           }
         }
@@ -1333,6 +1335,47 @@
     }
   }
   
+  private static void checkSortedSetDocValues(String fieldName, AtomicReader reader, SortedSetDocValues dv) {
+    final long maxOrd = dv.getValueCount()-1;
+    OpenBitSet seenOrds = new OpenBitSet(dv.getValueCount());
+    long maxOrd2 = -1;
+    for (int i = 0; i < reader.maxDoc(); i++) {
+      dv.setDocument(i);
+      long lastOrd = -1;
+      long ord;
+      while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+        if (ord <= lastOrd) {
+          throw new RuntimeException("ords out of order: " + ord + " <= " + lastOrd + " for doc: " + i);
+        }
+        if (ord < 0 || ord > maxOrd) {
+          throw new RuntimeException("ord out of bounds: " + ord);
+        }
+        lastOrd = ord;
+        maxOrd2 = Math.max(maxOrd2, ord);
+        seenOrds.set(ord);
+      }
+    }
+    if (maxOrd != maxOrd2) {
+      throw new RuntimeException("dv for field: " + fieldName + " reports wrong maxOrd=" + maxOrd + " but this is not the case: " + maxOrd2);
+    }
+    if (seenOrds.cardinality() != dv.getValueCount()) {
+      throw new RuntimeException("dv for field: " + fieldName + " has holes in its ords, valueCount=" + dv.getValueCount() + " but only used: " + seenOrds.cardinality());
+    }
+    
+    BytesRef lastValue = null;
+    BytesRef scratch = new BytesRef();
+    for (long i = 0; i <= maxOrd; i++) {
+      dv.lookupOrd(i, scratch);
+      assert scratch.isValid();
+      if (lastValue != null) {
+        if (scratch.compareTo(lastValue) <= 0) {
+          throw new RuntimeException("dv for field: " + fieldName + " has ords out of order: " + lastValue + " >=" + scratch);
+        }
+      }
+      lastValue = BytesRef.deepCopyOf(scratch);
+    }
+  }
+
   private static void checkNumericDocValues(String fieldName, AtomicReader reader, NumericDocValues ndv) {
     for (int i = 0; i < reader.maxDoc(); i++) {
       ndv.get(i);
@@ -1343,12 +1386,35 @@
     switch(fi.getDocValuesType()) {
       case SORTED:
         checkSortedDocValues(fi.name, reader, reader.getSortedDocValues(fi.name));
+        if (reader.getBinaryDocValues(fi.name) != null ||
+            reader.getNumericDocValues(fi.name) != null ||
+            reader.getSortedSetDocValues(fi.name) != null) {
+          throw new RuntimeException(fi.name + " returns multiple docvalues types!");
+        }
+        break;
+      case SORTED_SET:
+        checkSortedSetDocValues(fi.name, reader, reader.getSortedSetDocValues(fi.name));
+        if (reader.getBinaryDocValues(fi.name) != null ||
+            reader.getNumericDocValues(fi.name) != null ||
+            reader.getSortedDocValues(fi.name) != null) {
+          throw new RuntimeException(fi.name + " returns multiple docvalues types!");
+        }
         break;
       case BINARY:
         checkBinaryDocValues(fi.name, reader, reader.getBinaryDocValues(fi.name));
+        if (reader.getNumericDocValues(fi.name) != null ||
+            reader.getSortedDocValues(fi.name) != null ||
+            reader.getSortedSetDocValues(fi.name) != null) {
+          throw new RuntimeException(fi.name + " returns multiple docvalues types!");
+        }
         break;
       case NUMERIC:
         checkNumericDocValues(fi.name, reader, reader.getNumericDocValues(fi.name));
+        if (reader.getBinaryDocValues(fi.name) != null ||
+            reader.getSortedDocValues(fi.name) != null ||
+            reader.getSortedSetDocValues(fi.name) != null) {
+          throw new RuntimeException(fi.name + " returns multiple docvalues types!");
+        }
         break;
       default:
         throw new AssertionError();
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java b/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java
index e3ab779..734007e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java
@@ -265,6 +265,10 @@
 
   /** Call this only once (if you subclass!) */
   protected void uninvert(final AtomicReader reader, final BytesRef termPrefix) throws IOException {
+    final FieldInfo info = reader.getFieldInfos().fieldInfo(field);
+    if (info != null && info.hasDocValues()) {
+      throw new IllegalStateException("Type mismatch: " + field + " was indexed as " + info.getDocValuesType());
+    }
     //System.out.println("DTO uninvert field=" + field + " prefix=" + termPrefix);
     final long startTime = System.currentTimeMillis();
     prefix = termPrefix == null ? null : BytesRef.deepCopyOf(termPrefix);
@@ -596,93 +600,6 @@
     return pos;
   }
 
-  /** Iterates over the ords for a single document. */
-  public class TermOrdsIterator {
-    private int tnum;
-    private int upto;
-    private byte[] arr;
-
-    TermOrdsIterator() {
-    }
-
-    /** Buffer must be at least 5 ints long.  Returns number
-     *  of term ords placed into buffer; if this count is
-     *  less than buffer.length then that is the end. */
-    public int read(int[] buffer) {
-      int bufferUpto = 0;
-      if (arr == null) {
-        // code is inlined into upto
-        //System.out.println("inlined");
-        int code = upto;
-        int delta = 0;
-        for (;;) {
-          delta = (delta << 7) | (code & 0x7f);
-          if ((code & 0x80)==0) {
-            if (delta==0) break;
-            tnum += delta - TNUM_OFFSET;
-            buffer[bufferUpto++] = ordBase+tnum;
-            //System.out.println("  tnum=" + tnum);
-            delta = 0;
-          }
-          code >>>= 8;
-        }
-      } else {
-        // code is a pointer
-        for(;;) {
-          int delta = 0;
-          for(;;) {
-            byte b = arr[upto++];
-            delta = (delta << 7) | (b & 0x7f);
-            //System.out.println("    cycle: upto=" + upto + " delta=" + delta + " b=" + b);
-            if ((b & 0x80) == 0) break;
-          }
-          //System.out.println("  delta=" + delta);
-          if (delta == 0) break;
-          tnum += delta - TNUM_OFFSET;
-          //System.out.println("  tnum=" + tnum);
-          buffer[bufferUpto++] = ordBase+tnum;
-          if (bufferUpto == buffer.length) {
-            break;
-          }
-        }
-      }
-
-      return bufferUpto;
-    }
-
-    /** Reset the iterator on a new document. */
-    public TermOrdsIterator reset(int docID) {
-      //System.out.println("  reset docID=" + docID);
-      tnum = 0;
-      final int code = index[docID];
-      if ((code & 0xff)==1) {
-        // a pointer
-        upto = code>>>8;
-        //System.out.println("    pointer!  upto=" + upto);
-        int whichArray = (docID >>> 16) & 0xff;
-        arr = tnums[whichArray];
-      } else {
-        //System.out.println("    inline!");
-        arr = null;
-        upto = code;
-      }
-      return this;
-    }
-  }
-
-  /** Returns an iterator to step through the term ords for
-   *  this document.  It's also possible to subclass this
-   *  class and directly access members. */
-  public TermOrdsIterator lookup(int doc, TermOrdsIterator reuse) {
-    final TermOrdsIterator ret;
-    if (reuse != null) {
-      ret = reuse;
-    } else {
-      ret = new TermOrdsIterator();
-    }
-    return ret.reset(doc);
-  }
-
   /* Only used if original IndexReader doesn't implement
    * ord; in this case we "wrap" our own terms index
    * around it. */
@@ -847,4 +764,124 @@
     termsEnum.seekExact(ord);
     return termsEnum.term();
   }
+  
+  /** Returns a SortedSetDocValues view of this instance */
+  public SortedSetDocValues iterator(TermsEnum termsEnum) throws IOException {
+    if (isEmpty()) {
+      return SortedSetDocValues.EMPTY;
+    } else {
+      return new Iterator(termsEnum);
+    }
+  }
+  
+  private class Iterator extends SortedSetDocValues {
+    final TermsEnum te;
+    // currently we read 5 at a time (using the logic of the old iterator)
+    final int buffer[] = new int[5];
+    int bufferUpto;
+    int bufferLength;
+    
+    private int tnum;
+    private int upto;
+    private byte[] arr;
+    
+    Iterator(TermsEnum te) {
+      this.te = te;
+    }
+    
+    @Override
+    public long nextOrd() {
+      while (bufferUpto == bufferLength) {
+        if (bufferLength < buffer.length) {
+          return NO_MORE_ORDS;
+        } else {
+          bufferLength = read(buffer);
+          bufferUpto = 0;
+        }
+      }
+      return buffer[bufferUpto++];
+    }
+    
+    /** Buffer must be at least 5 ints long.  Returns number
+     *  of term ords placed into buffer; if this count is
+     *  less than buffer.length then that is the end. */
+    int read(int[] buffer) {
+      int bufferUpto = 0;
+      if (arr == null) {
+        // code is inlined into upto
+        //System.out.println("inlined");
+        int code = upto;
+        int delta = 0;
+        for (;;) {
+          delta = (delta << 7) | (code & 0x7f);
+          if ((code & 0x80)==0) {
+            if (delta==0) break;
+            tnum += delta - TNUM_OFFSET;
+            buffer[bufferUpto++] = ordBase+tnum;
+            //System.out.println("  tnum=" + tnum);
+            delta = 0;
+          }
+          code >>>= 8;
+        }
+      } else {
+        // code is a pointer
+        for(;;) {
+          int delta = 0;
+          for(;;) {
+            byte b = arr[upto++];
+            delta = (delta << 7) | (b & 0x7f);
+            //System.out.println("    cycle: upto=" + upto + " delta=" + delta + " b=" + b);
+            if ((b & 0x80) == 0) break;
+          }
+          //System.out.println("  delta=" + delta);
+          if (delta == 0) break;
+          tnum += delta - TNUM_OFFSET;
+          //System.out.println("  tnum=" + tnum);
+          buffer[bufferUpto++] = ordBase+tnum;
+          if (bufferUpto == buffer.length) {
+            break;
+          }
+        }
+      }
+
+      return bufferUpto;
+    }
+
+    @Override
+    public void setDocument(int docID) {
+      tnum = 0;
+      final int code = index[docID];
+      if ((code & 0xff)==1) {
+        // a pointer
+        upto = code>>>8;
+        //System.out.println("    pointer!  upto=" + upto);
+        int whichArray = (docID >>> 16) & 0xff;
+        arr = tnums[whichArray];
+      } else {
+        //System.out.println("    inline!");
+        arr = null;
+        upto = code;
+      }
+      bufferUpto = 0;
+      bufferLength = read(buffer);
+    }
+
+    @Override
+    public void lookupOrd(long ord, BytesRef result) {
+      BytesRef ref = null;
+      try {
+        ref = DocTermOrds.this.lookupTerm(te, (int) ord);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      result.bytes = ref.bytes;
+      result.offset = ref.offset;
+      result.length = ref.length;
+    }
+
+    @Override
+    public long getValueCount() {
+      return numTerms();
+    }
+  }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java
index c8aaeb0..90f2e45 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java
@@ -57,6 +57,8 @@
         addBinaryField(fieldInfo, docID, field.binaryValue());
       } else if (dvType == DocValuesType.SORTED) {
         addSortedField(fieldInfo, docID, field.binaryValue());
+      } else if (dvType == DocValuesType.SORTED_SET) {
+        addSortedSetField(fieldInfo, docID, field.binaryValue());
       } else if (dvType == DocValuesType.NUMERIC) {
         if (!(field.numericValue() instanceof Long)) {
           throw new IllegalArgumentException("illegal type " + field.numericValue().getClass() + ": DocValues types must be Long");
@@ -122,6 +124,20 @@
     }
     sortedWriter.addValue(docID, value);
   }
+  
+  void addSortedSetField(FieldInfo fieldInfo, int docID, BytesRef value) {
+    DocValuesWriter writer = writers.get(fieldInfo.name);
+    SortedSetDocValuesWriter sortedSetWriter;
+    if (writer == null) {
+      sortedSetWriter = new SortedSetDocValuesWriter(fieldInfo, bytesUsed);
+      writers.put(fieldInfo.name, sortedSetWriter);
+    } else if (!(writer instanceof SortedSetDocValuesWriter)) {
+      throw new IllegalArgumentException("Incompatible DocValues type: field \"" + fieldInfo.name + "\" changed from " + getTypeDesc(writer) + " to sorted");
+    } else {
+      sortedSetWriter = (SortedSetDocValuesWriter) writer;
+    }
+    sortedSetWriter.addValue(docID, value);
+  }
 
   void addNumericField(FieldInfo fieldInfo, int docID, long value) {
     DocValuesWriter writer = writers.get(fieldInfo.name);
diff --git a/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java b/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java
index fc3236d..3cba54e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java
@@ -101,7 +101,14 @@
      * byte[]. The stored byte[] is presorted and allows access via document id, 
      * ordinal and by-value.
      */
-    SORTED
+    SORTED,
+    /** 
+     * A pre-sorted Set&lt;byte[]&gt;. Fields with this type only store distinct byte values 
+     * and store additional offset pointers per document to dereference the shared 
+     * byte[]s. The stored byte[] is presorted and allows access via document id, 
+     * ordinal and by-value.
+     */
+    SORTED_SET
   };
 
   /**
diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterAtomicReader.java b/lucene/core/src/java/org/apache/lucene/index/FilterAtomicReader.java
index 44775e1..ae1c39b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FilterAtomicReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FilterAtomicReader.java
@@ -424,6 +424,12 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    ensureOpen();
+    return in.getSortedSetDocValues(field);
+  }
+
+  @Override
   public NumericDocValues getNormValues(String field) throws IOException {
     ensureOpen();
     return in.getNormValues(field);
diff --git a/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java b/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java
index 6416dcc..97f2eec 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java
@@ -24,6 +24,7 @@
 import org.apache.lucene.index.MultiTermsEnum.TermsEnumWithSlice;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.packed.AppendingLongBuffer;
+import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
 
 /**
  * A wrapper for CompositeIndexReader providing access to DocValues.
@@ -214,61 +215,151 @@
     if (!anyReal) {
       return null;
     } else {
-      OrdinalMap mapping = new OrdinalMap(r.getCoreCacheKey(), values);
+      TermsEnum enums[] = new TermsEnum[values.length];
+      for (int i = 0; i < values.length; i++) {
+        enums[i] = new SortedDocValuesTermsEnum(values[i]);
+      }
+      OrdinalMap mapping = new OrdinalMap(r.getCoreCacheKey(), enums);
       return new MultiSortedDocValues(values, starts, mapping);
     }
   }
   
+  /** Returns a SortedSetDocValues for a reader's docvalues (potentially doing extremely slow things).
+   * <p>
+   * This is an extremely slow way to access sorted values. Instead, access them per-segment
+   * with {@link AtomicReader#getSortedSetDocValues(String)}
+   * </p>  
+   */
+  public static SortedSetDocValues getSortedSetValues(final IndexReader r, final String field) throws IOException {
+    final List<AtomicReaderContext> leaves = r.leaves();
+    final int size = leaves.size();
+    
+    if (size == 0) {
+      return null;
+    } else if (size == 1) {
+      return leaves.get(0).reader().getSortedSetDocValues(field);
+    }
+    
+    boolean anyReal = false;
+    final SortedSetDocValues[] values = new SortedSetDocValues[size];
+    final int[] starts = new int[size+1];
+    for (int i = 0; i < size; i++) {
+      AtomicReaderContext context = leaves.get(i);
+      SortedSetDocValues v = context.reader().getSortedSetDocValues(field);
+      if (v == null) {
+        v = SortedSetDocValues.EMPTY;
+      } else {
+        anyReal = true;
+      }
+      values[i] = v;
+      starts[i] = context.docBase;
+    }
+    starts[size] = r.maxDoc();
+    
+    if (!anyReal) {
+      return null;
+    } else {
+      TermsEnum enums[] = new TermsEnum[values.length];
+      for (int i = 0; i < values.length; i++) {
+        enums[i] = new SortedSetDocValuesTermsEnum(values[i]);
+      }
+      OrdinalMap mapping = new OrdinalMap(r.getCoreCacheKey(), enums);
+      return new MultiSortedSetDocValues(values, starts, mapping);
+    }
+  }
+  
   /** maps per-segment ordinals to/from global ordinal space */
-  // TODO: use more efficient packed ints structures (these are all positive values!)
-  static class OrdinalMap {
+  // TODO: use more efficient packed ints structures?
+  // TODO: pull this out? its pretty generic (maps between N ord()-enabled TermsEnums) 
+  public static class OrdinalMap {
     // cache key of whoever asked for this aweful thing
     final Object owner;
     // globalOrd -> (globalOrd - segmentOrd)
-    final AppendingLongBuffer globalOrdDeltas;
+    final MonotonicAppendingLongBuffer globalOrdDeltas;
     // globalOrd -> sub index
     final AppendingLongBuffer subIndexes;
     // segmentOrd -> (globalOrd - segmentOrd)
-    final AppendingLongBuffer ordDeltas[];
+    final MonotonicAppendingLongBuffer ordDeltas[];
     
-    OrdinalMap(Object owner, SortedDocValues subs[]) throws IOException {
+    /** 
+     * Creates an ordinal map that allows mapping ords to/from a merged
+     * space from <code>subs</code>.
+     * @param owner a cache key
+     * @param subs TermsEnums that support {@link TermsEnum#ord()}. They need
+     *             not be dense (e.g. can be FilteredTermsEnums}.
+     * @throws IOException if an I/O error occurred.
+     */
+    public OrdinalMap(Object owner, TermsEnum subs[]) throws IOException {
       // create the ordinal mappings by pulling a termsenum over each sub's 
       // unique terms, and walking a multitermsenum over those
       this.owner = owner;
-      globalOrdDeltas = new AppendingLongBuffer();
+      globalOrdDeltas = new MonotonicAppendingLongBuffer();
       subIndexes = new AppendingLongBuffer();
-      ordDeltas = new AppendingLongBuffer[subs.length];
+      ordDeltas = new MonotonicAppendingLongBuffer[subs.length];
       for (int i = 0; i < ordDeltas.length; i++) {
-        ordDeltas[i] = new AppendingLongBuffer();
+        ordDeltas[i] = new MonotonicAppendingLongBuffer();
       }
-      int segmentOrds[] = new int[subs.length];
+      long segmentOrds[] = new long[subs.length];
       ReaderSlice slices[] = new ReaderSlice[subs.length];
       TermsEnumIndex indexes[] = new TermsEnumIndex[slices.length];
       for (int i = 0; i < slices.length; i++) {
         slices[i] = new ReaderSlice(0, 0, i);
-        indexes[i] = new TermsEnumIndex(new SortedDocValuesTermsEnum(subs[i]), i);
+        indexes[i] = new TermsEnumIndex(subs[i], i);
       }
       MultiTermsEnum mte = new MultiTermsEnum(slices);
       mte.reset(indexes);
-      int globalOrd = 0;
+      long globalOrd = 0;
       while (mte.next() != null) {        
         TermsEnumWithSlice matches[] = mte.getMatchArray();
         for (int i = 0; i < mte.getMatchCount(); i++) {
           int subIndex = matches[i].index;
-          int delta = globalOrd - segmentOrds[subIndex];
-          assert delta >= 0;
+          long segmentOrd = matches[i].terms.ord();
+          long delta = globalOrd - segmentOrd;
           // for each unique term, just mark the first subindex/delta where it occurs
           if (i == 0) {
             subIndexes.add(subIndex);
             globalOrdDeltas.add(delta);
           }
           // for each per-segment ord, map it back to the global term.
-          ordDeltas[subIndex].add(delta);
-          segmentOrds[subIndex]++;
+          while (segmentOrds[subIndex] <= segmentOrd) {
+            ordDeltas[subIndex].add(delta);
+            segmentOrds[subIndex]++;
+          }
         }
         globalOrd++;
       }
     }
+    
+    /** 
+     * Given a segment number and segment ordinal, returns
+     * the corresponding global ordinal.
+     */
+    public long getGlobalOrd(int subIndex, long segmentOrd) {
+      return segmentOrd + ordDeltas[subIndex].get(segmentOrd);
+    }
+
+    /**
+     * Given a segment number and global ordinal, returns
+     * the corresponding segment ordinal.
+     */
+    public long getSegmentOrd(int subIndex, long globalOrd) {
+      return globalOrd - globalOrdDeltas.get(globalOrd);
+    }
+    
+    /** 
+     * Given a global ordinal, returns the index of the first
+     * sub that contains this term.
+     */
+    public int getSegmentNumber(long globalOrd) {
+      return (int) subIndexes.get(globalOrd);
+    }
+    
+    /**
+     * Returns the total number of unique terms in global ord space.
+     */
+    public long getValueCount() {
+      return globalOrdDeltas.size();
+    }
   }
   
   /** implements SortedDocValues over n subs, using an OrdinalMap */
@@ -289,19 +380,63 @@
     public int getOrd(int docID) {
       int subIndex = ReaderUtil.subIndex(docID, docStarts);
       int segmentOrd = values[subIndex].getOrd(docID - docStarts[subIndex]);
-      return (int) (segmentOrd + mapping.ordDeltas[subIndex].get(segmentOrd));
+      return (int) mapping.getGlobalOrd(subIndex, segmentOrd);
     }
  
     @Override
     public void lookupOrd(int ord, BytesRef result) {
-      int subIndex = (int) mapping.subIndexes.get(ord);
-      int segmentOrd = (int) (ord - mapping.globalOrdDeltas.get(ord));
+      int subIndex = mapping.getSegmentNumber(ord);
+      int segmentOrd = (int) mapping.getSegmentOrd(subIndex, ord);
       values[subIndex].lookupOrd(segmentOrd, result);
     }
  
     @Override
     public int getValueCount() {
-      return mapping.globalOrdDeltas.size();
+      return (int) mapping.getValueCount();
+    }
+  }
+  
+  /** implements MultiSortedDocValues over n subs, using an OrdinalMap */
+  static class MultiSortedSetDocValues extends SortedSetDocValues {
+    final int docStarts[];
+    final SortedSetDocValues values[];
+    final OrdinalMap mapping;
+    int currentSubIndex;
+    
+    MultiSortedSetDocValues(SortedSetDocValues values[], int docStarts[], OrdinalMap mapping) throws IOException {
+      assert values.length == mapping.ordDeltas.length;
+      assert docStarts.length == values.length + 1;
+      this.values = values;
+      this.docStarts = docStarts;
+      this.mapping = mapping;
+    }
+    
+    @Override
+    public long nextOrd() {
+      long segmentOrd = values[currentSubIndex].nextOrd();
+      if (segmentOrd == NO_MORE_ORDS) {
+        return segmentOrd;
+      } else {
+        return mapping.getGlobalOrd(currentSubIndex, segmentOrd);
+      }
+    }
+
+    @Override
+    public void setDocument(int docID) {
+      currentSubIndex = ReaderUtil.subIndex(docID, docStarts);
+      values[currentSubIndex].setDocument(docID - docStarts[currentSubIndex]);
+    }
+ 
+    @Override
+    public void lookupOrd(long ord, BytesRef result) {
+      int subIndex = mapping.getSegmentNumber(ord);
+      long segmentOrd = mapping.getSegmentOrd(subIndex, ord);
+      values[subIndex].lookupOrd(segmentOrd, result);
+    }
+ 
+    @Override
+    public long getValueCount() {
+      return mapping.getValueCount();
     }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java b/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java
index 5255ef4..bcaac7c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java
@@ -497,7 +497,7 @@
 
   final static class TermsEnumWithSlice {
     private final ReaderSlice subSlice;
-    private TermsEnum terms;
+    TermsEnum terms;
     public BytesRef current;
     final int index;
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java
index 1a3a315..a77e8ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java
@@ -50,7 +50,7 @@
     }
 
     // Fill in any holes:
-    for (int i = pending.size(); i < docID; ++i) {
+    for (int i = (int)pending.size(); i < docID; ++i) {
       pending.add(MISSING);
     }
 
@@ -90,7 +90,7 @@
   // iterates over the values we have in ram
   private class NumericIterator implements Iterator<Number> {
     final AppendingLongBuffer.Iterator iter = pending.iterator();
-    final int size = pending.size();
+    final int size = (int)pending.size();
     final int maxDoc;
     int upto;
     
diff --git a/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java b/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
index 4244ecd..496ba94 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
@@ -285,6 +285,13 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    ensureOpen();
+    AtomicReader reader = fieldToReader.get(field);
+    return reader == null ? null : reader.getSortedSetDocValues(field);
+  }
+
+  @Override
   public NumericDocValues getNormValues(String field) throws IOException {
     ensureOpen();
     AtomicReader reader = fieldToReader.get(field);
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
index b1f72dc..e5498a2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
@@ -253,6 +253,34 @@
 
     return dvs;
   }
+  
+  SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    FieldInfo fi = fieldInfos.fieldInfo(field);
+    if (fi == null) {
+      // Field does not exist
+      return null;
+    }
+    if (fi.getDocValuesType() == null) {
+      // Field was not indexed with doc values
+      return null;
+    }
+    if (fi.getDocValuesType() != DocValuesType.SORTED_SET) {
+      // DocValues were not sorted
+      return null;
+    }
+
+    assert dvProducer != null;
+
+    Map<String,Object> dvFields = docValuesLocal.get();
+
+    SortedSetDocValues dvs = (SortedSetDocValues) dvFields.get(field);
+    if (dvs == null) {
+      dvs = dvProducer.getSortedSet(fi);
+      dvFields.put(field, dvs);
+    }
+
+    return dvs;
+  }
 
   NumericDocValues getNormValues(String field) throws IOException {
     FieldInfo fi = fieldInfos.fieldInfo(field);
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
index c771501..4afb911 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
@@ -197,6 +197,16 @@
               toMerge.add(values);
             }
             consumer.mergeSortedField(field, mergeState, toMerge);
+          } else if (type == DocValuesType.SORTED_SET) {
+            List<SortedSetDocValues> toMerge = new ArrayList<SortedSetDocValues>();
+            for (AtomicReader reader : mergeState.readers) {
+              SortedSetDocValues values = reader.getSortedSetDocValues(field.name);
+              if (values == null) {
+                values = SortedSetDocValues.EMPTY;
+              }
+              toMerge.add(values);
+            }
+            consumer.mergeSortedSetField(field, mergeState, toMerge);
           } else {
             throw new AssertionError("type=" + type);
           }
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java b/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
index 62e8c18..aee16b1 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
@@ -248,6 +248,12 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    ensureOpen();
+    return core.getSortedSetDocValues(field);
+  }
+
+  @Override
   public NumericDocValues getNormValues(String field) throws IOException {
     ensureOpen();
     return core.getNormValues(field);
diff --git a/lucene/core/src/java/org/apache/lucene/index/SingletonSortedSetDocValues.java b/lucene/core/src/java/org/apache/lucene/index/SingletonSortedSetDocValues.java
new file mode 100644
index 0000000..d44bb81
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/SingletonSortedSetDocValues.java
@@ -0,0 +1,71 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.BytesRef;
+
+/** 
+ * Exposes multi-valued view over a single-valued instance.
+ * <p>
+ * This can be used if you want to have one multi-valued implementation
+ * against e.g. FieldCache.getDocTermOrds that also works for single-valued 
+ * fields.
+ */
+public class SingletonSortedSetDocValues extends SortedSetDocValues {
+  private final SortedDocValues in;
+  private int docID;
+  private boolean set;
+  
+  /** Creates a multi-valued view over the provided SortedDocValues */
+  public SingletonSortedSetDocValues(SortedDocValues in) {
+    this.in = in;
+    assert NO_MORE_ORDS == -1; // this allows our nextOrd() to work for missing values without a check
+  }
+
+  @Override
+  public long nextOrd() {
+    if (set) {
+      return NO_MORE_ORDS;
+    } else {
+      set = true;
+      return in.getOrd(docID);
+    }
+  }
+
+  @Override
+  public void setDocument(int docID) {
+    this.docID = docID;
+    set = false;
+  }
+
+  @Override
+  public void lookupOrd(long ord, BytesRef result) {
+    // cast is ok: single-valued cannot exceed Integer.MAX_VALUE
+    in.lookupOrd((int)ord, result);
+  }
+
+  @Override
+  public long getValueCount() {
+    return in.getValueCount();
+  }
+
+  @Override
+  public long lookupTerm(BytesRef key) {
+    return in.lookupTerm(key);
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/SlowCompositeReaderWrapper.java b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeReaderWrapper.java
index 32380f2..4330a7b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SlowCompositeReaderWrapper.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeReaderWrapper.java
@@ -24,7 +24,9 @@
 import org.apache.lucene.util.Bits;
 
 import org.apache.lucene.index.DirectoryReader; // javadoc
+import org.apache.lucene.index.FieldInfo.DocValuesType;
 import org.apache.lucene.index.MultiDocValues.MultiSortedDocValues;
+import org.apache.lucene.index.MultiDocValues.MultiSortedSetDocValues;
 import org.apache.lucene.index.MultiDocValues.OrdinalMap;
 import org.apache.lucene.index.MultiReader; // javadoc
 
@@ -113,8 +115,10 @@
         return dv;
       }
     }
-    // cached multi dv
-    assert map != null;
+    // cached ordinal map
+    if (getFieldInfos().fieldInfo(field).getDocValuesType() != DocValuesType.SORTED) {
+      return null;
+    }
     int size = in.leaves().size();
     final SortedDocValues[] values = new SortedDocValues[size];
     final int[] starts = new int[size+1];
@@ -131,6 +135,45 @@
     return new MultiSortedDocValues(values, starts, map);
   }
   
+  @Override
+  public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    ensureOpen();
+    OrdinalMap map = null;
+    synchronized (cachedOrdMaps) {
+      map = cachedOrdMaps.get(field);
+      if (map == null) {
+        // uncached, or not a multi dv
+        SortedSetDocValues dv = MultiDocValues.getSortedSetValues(in, field);
+        if (dv instanceof MultiSortedSetDocValues) {
+          map = ((MultiSortedSetDocValues)dv).mapping;
+          if (map.owner == getCoreCacheKey()) {
+            cachedOrdMaps.put(field, map);
+          }
+        }
+        return dv;
+      }
+    }
+    // cached ordinal map
+    if (getFieldInfos().fieldInfo(field).getDocValuesType() != DocValuesType.SORTED_SET) {
+      return null;
+    }
+    assert map != null;
+    int size = in.leaves().size();
+    final SortedSetDocValues[] values = new SortedSetDocValues[size];
+    final int[] starts = new int[size+1];
+    for (int i = 0; i < size; i++) {
+      AtomicReaderContext context = in.leaves().get(i);
+      SortedSetDocValues v = context.reader().getSortedSetDocValues(field);
+      if (v == null) {
+        v = SortedSetDocValues.EMPTY;
+      }
+      values[i] = v;
+      starts[i] = context.docBase;
+    }
+    starts[size] = maxDoc();
+    return new MultiSortedSetDocValues(values, starts, map);
+  }
+  
   // TODO: this could really be a weak map somewhere else on the coreCacheKey,
   // but do we really need to optimize slow-wrapper any more?
   private final Map<String,OrdinalMap> cachedOrdMaps = new HashMap<String,OrdinalMap>();
diff --git a/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValues.java b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValues.java
new file mode 100644
index 0000000..ce10caa
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValues.java
@@ -0,0 +1,120 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.BytesRef;
+
+/**
+ * A per-document set of presorted byte[] values.
+ * <p>
+ * Per-Document values in a SortedDocValues are deduplicated, dereferenced,
+ * and sorted into a dictionary of unique values. A pointer to the
+ * dictionary value (ordinal) can be retrieved for each document. Ordinals
+ * are dense and in increasing sorted order.
+ */
+public abstract class SortedSetDocValues {
+  
+  /** Sole constructor. (For invocation by subclass 
+   * constructors, typically implicit.) */
+  protected SortedSetDocValues() {}
+
+  /** When returned by {@link #nextOrd()} it means there are no more 
+   * ordinals for the document.
+   */
+  public static final long NO_MORE_ORDS = -1;
+
+  /** 
+   * Returns the next ordinal for the current document (previously
+   * set by {@link #setDocument(int)}.
+   * @return next ordinal for the document, or {@link #NO_MORE_ORDS}. 
+   *         ordinals are dense, start at 0, then increment by 1 for 
+   *         the next value in sorted order. 
+   */
+  public abstract long nextOrd();
+  
+  /** 
+   * Sets iteration to the specified docID 
+   * @param docID document ID 
+   */
+  public abstract void setDocument(int docID);
+
+  /** Retrieves the value for the specified ordinal.
+   * @param ord ordinal to lookup
+   * @param result will be populated with the ordinal's value
+   * @see #nextOrd
+   */
+  public abstract void lookupOrd(long ord, BytesRef result);
+
+  /**
+   * Returns the number of unique values.
+   * @return number of unique values in this SortedDocValues. This is
+   *         also equivalent to one plus the maximum ordinal.
+   */
+  public abstract long getValueCount();
+
+
+  /** An empty SortedDocValues which returns {@link #NO_MORE_ORDS} for every document */
+  public static final SortedSetDocValues EMPTY = new SortedSetDocValues() {
+
+    @Override
+    public long nextOrd() {
+      return NO_MORE_ORDS;
+    }
+
+    @Override
+    public void setDocument(int docID) {}
+
+    @Override
+    public void lookupOrd(long ord, BytesRef result) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    @Override
+    public long getValueCount() {
+      return 0;
+    }
+  };
+
+  /** If {@code key} exists, returns its ordinal, else
+   *  returns {@code -insertionPoint-1}, like {@code
+   *  Arrays.binarySearch}.
+   *
+   *  @param key Key to look up
+   **/
+  public long lookupTerm(BytesRef key) {
+    BytesRef spare = new BytesRef();
+    long low = 0;
+    long high = getValueCount()-1;
+
+    while (low <= high) {
+      long mid = (low + high) >>> 1;
+      lookupOrd(mid, spare);
+      int cmp = spare.compareTo(key);
+
+      if (cmp < 0) {
+        low = mid + 1;
+      } else if (cmp > 0) {
+        high = mid - 1;
+      } else {
+        return mid; // key found
+      }
+    }
+
+    return -(low + 1);  // key not found.
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesTermsEnum.java b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesTermsEnum.java
new file mode 100644
index 0000000..589bc10
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesTermsEnum.java
@@ -0,0 +1,139 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+
+/** Implements a {@link TermsEnum} wrapping a provided
+ * {@link SortedSetDocValues}. */
+
+public class SortedSetDocValuesTermsEnum extends TermsEnum {
+  private final SortedSetDocValues values;
+  private long currentOrd = -1;
+  private final BytesRef term = new BytesRef();
+
+  /** Creates a new TermsEnum over the provided values */
+  public SortedSetDocValuesTermsEnum(SortedSetDocValues values) {
+    this.values = values;
+  }
+
+  @Override
+  public SeekStatus seekCeil(BytesRef text, boolean useCache /* ignored */) throws IOException {
+    long ord = values.lookupTerm(text);
+    if (ord >= 0) {
+      currentOrd = ord;
+      term.offset = 0;
+      // TODO: is there a cleaner way?
+      // term.bytes may be pointing to codec-private byte[]
+      // storage, so we must force new byte[] allocation:
+      term.bytes = new byte[text.length];
+      term.copyBytes(text);
+      return SeekStatus.FOUND;
+    } else {
+      currentOrd = -ord-1;
+      if (currentOrd == values.getValueCount()) {
+        return SeekStatus.END;
+      } else {
+        // TODO: hmm can we avoid this "extra" lookup?:
+        values.lookupOrd(currentOrd, term);
+        return SeekStatus.NOT_FOUND;
+      }
+    }
+  }
+
+  @Override
+  public boolean seekExact(BytesRef text, boolean useCache) throws IOException {
+    long ord = values.lookupTerm(text);
+    if (ord >= 0) {
+      currentOrd = ord;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void seekExact(long ord) throws IOException {
+    assert ord >= 0 && ord < values.getValueCount();
+    currentOrd = (int) ord;
+    values.lookupOrd(currentOrd, term);
+  }
+
+  @Override
+  public BytesRef next() throws IOException {
+    currentOrd++;
+    if (currentOrd >= values.getValueCount()) {
+      return null;
+    }
+    values.lookupOrd(currentOrd, term);
+    return term;
+  }
+
+  @Override
+  public BytesRef term() throws IOException {
+    return term;
+  }
+
+  @Override
+  public long ord() throws IOException {
+    return currentOrd;
+  }
+
+  @Override
+  public int docFreq() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long totalTermFreq() {
+    return -1;
+  }
+
+  @Override
+  public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Comparator<BytesRef> getComparator() {
+    return BytesRef.getUTF8SortedAsUnicodeComparator();
+  }
+
+  @Override
+  public void seekExact(BytesRef term, TermState state) throws IOException {
+    assert state != null && state instanceof OrdTermState;
+    this.seekExact(((OrdTermState)state).ord);
+  }
+
+  @Override
+  public TermState termState() throws IOException {
+    OrdTermState state = new OrdTermState();
+    state.ord = currentOrd;
+    return state;
+  }
+}
+
diff --git a/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesWriter.java b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesWriter.java
new file mode 100644
index 0000000..f9f6135
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/SortedSetDocValuesWriter.java
@@ -0,0 +1,305 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.lucene.codecs.DocValuesConsumer;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.ByteBlockPool;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefHash.DirectBytesStartArray;
+import org.apache.lucene.util.BytesRefHash;
+import org.apache.lucene.util.Counter;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.AppendingLongBuffer;
+
+/** Buffers up pending byte[]s per doc, deref and sorting via
+ *  int ord, then flushes when segment flushes. */
+class SortedSetDocValuesWriter extends DocValuesWriter {
+  final BytesRefHash hash;
+  private AppendingLongBuffer pending; // stream of all ords
+  private AppendingLongBuffer pendingCounts; // ords per doc
+  private final Counter iwBytesUsed;
+  private long bytesUsed; // this only tracks differences in 'pending' and 'pendingCounts'
+  private final FieldInfo fieldInfo;
+  private int currentDoc;
+  private int currentValues[] = new int[8];
+  private int currentUpto = 0;
+  private int maxCount = 0;
+
+  public SortedSetDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) {
+    this.fieldInfo = fieldInfo;
+    this.iwBytesUsed = iwBytesUsed;
+    hash = new BytesRefHash(
+        new ByteBlockPool(
+            new ByteBlockPool.DirectTrackingAllocator(iwBytesUsed)),
+            BytesRefHash.DEFAULT_CAPACITY,
+            new DirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY, iwBytesUsed));
+    pending = new AppendingLongBuffer();
+    pendingCounts = new AppendingLongBuffer();
+    bytesUsed = pending.ramBytesUsed() + pendingCounts.ramBytesUsed();
+    iwBytesUsed.addAndGet(bytesUsed);
+  }
+
+  public void addValue(int docID, BytesRef value) {
+    if (value == null) {
+      throw new IllegalArgumentException("field \"" + fieldInfo.name + "\": null value not allowed");
+    }
+    if (value.length > (BYTE_BLOCK_SIZE - 2)) {
+      throw new IllegalArgumentException("DocValuesField \"" + fieldInfo.name + "\" is too large, must be <= " + (BYTE_BLOCK_SIZE - 2));
+    }
+    
+    if (docID != currentDoc) {
+      finishCurrentDoc();
+    }
+
+    // Fill in any holes:
+    while(currentDoc < docID) {
+      pendingCounts.add(0); // no values
+      currentDoc++;
+    }
+
+    addOneValue(value);
+    updateBytesUsed();
+  }
+  
+  // finalize currentDoc: this deduplicates the current term ids
+  private void finishCurrentDoc() {
+    Arrays.sort(currentValues, 0, currentUpto);
+    int lastValue = -1;
+    int count = 0;
+    for (int i = 0; i < currentUpto; i++) {
+      int v = currentValues[i];
+      // if its not a duplicate
+      if (v != lastValue) {
+        pending.add(v); // record the ord
+        count++;
+      }
+      lastValue = v;
+    }
+    // record the number of unique ords for this doc
+    pendingCounts.add(count);
+    maxCount = Math.max(maxCount, count);
+    currentUpto = 0;
+    currentDoc++;
+  }
+
+  @Override
+  public void finish(int maxDoc) {
+    finishCurrentDoc();
+    
+    // fill in any holes
+    for (int i = currentDoc; i < maxDoc; i++) {
+      pendingCounts.add(0); // no values
+    }
+  }
+
+  private void addOneValue(BytesRef value) {
+    int ord = hash.add(value);
+    if (ord < 0) {
+      ord = -ord-1;
+    } else {
+      // reserve additional space for each unique value:
+      // 1. when indexing, when hash is 50% full, rehash() suddenly needs 2*size ints.
+      //    TODO: can this same OOM happen in THPF?
+      // 2. when flushing, we need 1 int per value (slot in the ordMap).
+      iwBytesUsed.addAndGet(2 * RamUsageEstimator.NUM_BYTES_INT);
+    }
+    
+    if (currentUpto == currentValues.length) {
+      currentValues = ArrayUtil.grow(currentValues, currentValues.length+1);
+      // reserve additional space for max # values per-doc
+      // when flushing, we need an int[] to sort the mapped-ords within the doc
+      iwBytesUsed.addAndGet((currentValues.length - currentUpto) * 2 * RamUsageEstimator.NUM_BYTES_INT);
+    }
+    
+    currentValues[currentUpto] = ord;
+    currentUpto++;
+  }
+  
+  private void updateBytesUsed() {
+    final long newBytesUsed = pending.ramBytesUsed() + pendingCounts.ramBytesUsed();
+    iwBytesUsed.addAndGet(newBytesUsed - bytesUsed);
+    bytesUsed = newBytesUsed;
+  }
+
+  @Override
+  public void flush(SegmentWriteState state, DocValuesConsumer dvConsumer) throws IOException {
+    final int maxDoc = state.segmentInfo.getDocCount();
+    final int maxCountPerDoc = maxCount;
+    assert pendingCounts.size() == maxDoc;
+    final int valueCount = hash.size();
+
+    final int[] sortedValues = hash.sort(BytesRef.getUTF8SortedAsUnicodeComparator());
+    final int[] ordMap = new int[valueCount];
+
+    for(int ord=0;ord<valueCount;ord++) {
+      ordMap[sortedValues[ord]] = ord;
+    }
+
+    dvConsumer.addSortedSetField(fieldInfo,
+
+                              // ord -> value
+                              new Iterable<BytesRef>() {
+                                @Override
+                                public Iterator<BytesRef> iterator() {
+                                  return new ValuesIterator(sortedValues, valueCount);
+                                }
+                              },
+                              
+                              // doc -> ordCount
+                              new Iterable<Number>() {
+                                @Override
+                                public Iterator<Number> iterator() {
+                                  return new OrdCountIterator(maxDoc);
+                                }
+                              },
+
+                              // ords
+                              new Iterable<Number>() {
+                                @Override
+                                public Iterator<Number> iterator() {
+                                  return new OrdsIterator(ordMap, maxCountPerDoc);
+                                }
+                              });
+  }
+
+  @Override
+  public void abort() {
+  }
+  
+  // iterates over the unique values we have in ram
+  private class ValuesIterator implements Iterator<BytesRef> {
+    final int sortedValues[];
+    final BytesRef scratch = new BytesRef();
+    final int valueCount;
+    int ordUpto;
+    
+    ValuesIterator(int sortedValues[], int valueCount) {
+      this.sortedValues = sortedValues;
+      this.valueCount = valueCount;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return ordUpto < valueCount;
+    }
+
+    @Override
+    public BytesRef next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      hash.get(sortedValues[ordUpto], scratch);
+      ordUpto++;
+      return scratch;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  // iterates over the ords for each doc we have in ram
+  private class OrdsIterator implements Iterator<Number> {
+    final AppendingLongBuffer.Iterator iter = pending.iterator();
+    final AppendingLongBuffer.Iterator counts = pendingCounts.iterator();
+    final int ordMap[];
+    final long numOrds;
+    long ordUpto;
+    
+    final int currentDoc[];
+    int currentUpto;
+    int currentLength;
+    
+    OrdsIterator(int ordMap[], int maxCount) {
+      this.currentDoc = new int[maxCount];
+      this.ordMap = ordMap;
+      this.numOrds = pending.size();
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return ordUpto < numOrds;
+    }
+
+    @Override
+    public Number next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      while (currentUpto == currentLength) {
+        // refill next doc, and sort remapped ords within the doc.
+        currentUpto = 0;
+        currentLength = (int) counts.next();
+        for (int i = 0; i < currentLength; i++) {
+          currentDoc[i] = ordMap[(int) iter.next()];
+        }
+        Arrays.sort(currentDoc, 0, currentLength);
+      }
+      int ord = currentDoc[currentUpto];
+      currentUpto++;
+      ordUpto++;
+      // TODO: make reusable Number
+      return ord;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  private class OrdCountIterator implements Iterator<Number> {
+    final AppendingLongBuffer.Iterator iter = pendingCounts.iterator();
+    final int maxDoc;
+    int docUpto;
+    
+    OrdCountIterator(int maxDoc) {
+      this.maxDoc = maxDoc;
+      assert pendingCounts.size() == maxDoc;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return docUpto < maxDoc;
+    }
+
+    @Override
+    public Number next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      docUpto++;
+      // TODO: make reusable Number
+      return iter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/search/FieldCache.java b/lucene/core/src/java/org/apache/lucene/search/FieldCache.java
index 62edc7f..4a3100a 100644
--- a/lucene/core/src/java/org/apache/lucene/search/FieldCache.java
+++ b/lucene/core/src/java/org/apache/lucene/search/FieldCache.java
@@ -29,6 +29,7 @@
 import org.apache.lucene.index.BinaryDocValues;
 import org.apache.lucene.index.DocTermOrds;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.util.Bits;
@@ -670,7 +671,7 @@
    * @return a {@link DocTermOrds} instance
    * @throws IOException  If any error occurs.
    */
-  public DocTermOrds getDocTermOrds(AtomicReader reader, String field) throws IOException;
+  public SortedSetDocValues getDocTermOrds(AtomicReader reader, String field) throws IOException;
 
   /**
    * EXPERT: A unique Identifier/Description for each item in the FieldCache. 
diff --git a/lucene/core/src/java/org/apache/lucene/search/FieldCacheImpl.java b/lucene/core/src/java/org/apache/lucene/search/FieldCacheImpl.java
index 5b3ed3f..e444956 100644
--- a/lucene/core/src/java/org/apache/lucene/search/FieldCacheImpl.java
+++ b/lucene/core/src/java/org/apache/lucene/search/FieldCacheImpl.java
@@ -33,7 +33,9 @@
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.SingletonSortedSetDocValues;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.util.ArrayUtil;
@@ -1363,8 +1365,30 @@
     }
   }
 
-  public DocTermOrds getDocTermOrds(AtomicReader reader, String field) throws IOException {
-    return (DocTermOrds) caches.get(DocTermOrds.class).get(reader, new CacheKey(field, null), false);
+  // TODO: this if DocTermsIndex was already created, we
+  // should share it...
+  public SortedSetDocValues getDocTermOrds(AtomicReader reader, String field) throws IOException {
+    SortedSetDocValues dv = reader.getSortedSetDocValues(field);
+    if (dv != null) {
+      return dv;
+    }
+    
+    SortedDocValues sdv = reader.getSortedDocValues(field);
+    if (sdv != null) {
+      return new SingletonSortedSetDocValues(sdv);
+    }
+    
+    final FieldInfo info = reader.getFieldInfos().fieldInfo(field);
+    if (info == null) {
+      return SortedSetDocValues.EMPTY;
+    } else if (info.hasDocValues()) {
+      throw new IllegalStateException("Type mismatch: " + field + " was indexed as " + info.getDocValuesType());
+    } else if (!info.isIndexed()) {
+      return SortedSetDocValues.EMPTY;
+    }
+    
+    DocTermOrds dto = (DocTermOrds) caches.get(DocTermOrds.class).get(reader, new CacheKey(field, null), false);
+    return dto.iterator(dto.getOrdTermsEnum(reader));
   }
 
   static final class DocTermOrdsCache extends Cache {
@@ -1375,7 +1399,6 @@
     @Override
     protected Object createValue(AtomicReader reader, CacheKey key, boolean setDocsWithField /* ignored */)
         throws IOException {
-      // No DocValues impl yet (DocValues are single valued...):
       return new DocTermOrds(reader, key.field);
     }
   }
diff --git a/lucene/core/src/java/org/apache/lucene/util/packed/AbstractAppendingLongBuffer.java b/lucene/core/src/java/org/apache/lucene/util/packed/AbstractAppendingLongBuffer.java
new file mode 100644
index 0000000..087154d
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/packed/AbstractAppendingLongBuffer.java
@@ -0,0 +1,150 @@
+package org.apache.lucene.util.packed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Arrays;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/** Common functionality shared by {@link AppendingLongBuffer} and {@link MonotonicAppendingLongBuffer}. */
+abstract class AbstractAppendingLongBuffer {
+
+  static final int BLOCK_BITS = 10;
+  static final int MAX_PENDING_COUNT = 1 << BLOCK_BITS;
+  static final int BLOCK_MASK = MAX_PENDING_COUNT - 1;
+
+  long[] minValues;
+  PackedInts.Reader[] deltas;
+  private long deltasBytes;
+  int valuesOff;
+  long[] pending;
+  int pendingOff;
+
+  AbstractAppendingLongBuffer(int initialBlockCount) {
+    minValues = new long[16];
+    deltas = new PackedInts.Reader[16];
+    pending = new long[MAX_PENDING_COUNT];
+    valuesOff = 0;
+    pendingOff = 0;
+  }
+
+  /** Get the number of values that have been added to the buffer. */
+  public final long size() {
+    return valuesOff * (long) MAX_PENDING_COUNT + pendingOff;
+  }
+
+  /** Append a value to this buffer. */
+  public final void add(long l) {
+    if (pendingOff == MAX_PENDING_COUNT) {
+      // check size
+      if (deltas.length == valuesOff) {
+        final int newLength = ArrayUtil.oversize(valuesOff + 1, 8);
+        grow(newLength);
+      }
+      packPendingValues();
+      if (deltas[valuesOff] != null) {
+        deltasBytes += deltas[valuesOff].ramBytesUsed();
+      }
+      ++valuesOff;
+      // reset pending buffer
+      pendingOff = 0;
+    }
+    pending[pendingOff++] = l;
+  }
+
+  void grow(int newBlockCount) {
+    minValues = Arrays.copyOf(minValues, newBlockCount);
+    deltas = Arrays.copyOf(deltas, newBlockCount);
+  }
+
+  abstract void packPendingValues();
+
+  /** Get a value from this buffer. */
+  public final long get(long index) {
+    if (index < 0 || index >= size()) {
+      throw new IndexOutOfBoundsException("" + index);
+    }
+    int block = (int) (index >> BLOCK_BITS);
+    int element = (int) (index & BLOCK_MASK);
+    return get(block, element);
+  }
+
+  abstract long get(int block, int element);
+
+  abstract Iterator iterator();
+
+  abstract class Iterator {
+
+    long[] currentValues;
+    int vOff, pOff;
+
+    Iterator() {
+      vOff = pOff = 0;
+      if (valuesOff == 0) {
+        currentValues = pending;
+      } else {
+        currentValues = new long[MAX_PENDING_COUNT];
+        fillValues();
+      }
+    }
+
+    abstract void fillValues();
+
+    /** Whether or not there are remaining values. */
+    public final boolean hasNext() {
+      return vOff < valuesOff || (vOff == valuesOff && pOff < pendingOff);
+    }
+
+    /** Return the next long in the buffer. */
+    public final long next() {
+      assert hasNext();
+      long result = currentValues[pOff++];
+      if (pOff == MAX_PENDING_COUNT) {
+        vOff += 1;
+        pOff = 0;
+        if (vOff <= valuesOff) {
+          fillValues();
+        }
+      }
+      return result;
+    }
+
+  }
+
+  long baseRamBytesUsed() {
+    return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+        + 3 * RamUsageEstimator.NUM_BYTES_OBJECT_REF // the 3 arrays
+        + 2 * RamUsageEstimator.NUM_BYTES_INT; // the 2 offsets
+  }
+
+  /**
+   * Return the number of bytes used by this instance.
+   */
+  public long ramBytesUsed() {
+    // TODO: this is called per-doc-per-norms/dv-field, can we optimize this?
+    long bytesUsed = RamUsageEstimator.alignObjectSize(baseRamBytesUsed())
+        + RamUsageEstimator.NUM_BYTES_LONG // valuesBytes
+        + RamUsageEstimator.sizeOf(pending)
+        + RamUsageEstimator.sizeOf(minValues)
+        + RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF * deltas.length); // values
+
+    return bytesUsed + deltasBytes;
+  }
+
+}
diff --git a/lucene/core/src/java/org/apache/lucene/util/packed/AppendingLongBuffer.java b/lucene/core/src/java/org/apache/lucene/util/packed/AppendingLongBuffer.java
index 258eeba..978fc32 100644
--- a/lucene/core/src/java/org/apache/lucene/util/packed/AppendingLongBuffer.java
+++ b/lucene/core/src/java/org/apache/lucene/util/packed/AppendingLongBuffer.java
@@ -19,72 +19,33 @@
 
 import java.util.Arrays;
 
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.RamUsageEstimator;
-
 /**
  * Utility class to buffer a list of signed longs in memory. This class only
- * supports appending.
+ * supports appending and is optimized for the case where values are close to
+ * each other.
  * @lucene.internal
  */
-public class AppendingLongBuffer {
-
-  private static final int BLOCK_BITS = 10;
-  private static final int MAX_PENDING_COUNT = 1 << BLOCK_BITS;
-  private static final int BLOCK_MASK = MAX_PENDING_COUNT - 1;
-
-  private long[] minValues;
-  private PackedInts.Reader[] values;
-  private long valuesBytes;
-  private int valuesOff;
-  private long[] pending;
-  private int pendingOff;
+public final class AppendingLongBuffer extends AbstractAppendingLongBuffer {
 
   /** Sole constructor. */
   public AppendingLongBuffer() {
-    minValues = new long[16];
-    values = new PackedInts.Reader[16];
-    pending = new long[MAX_PENDING_COUNT];
-    valuesOff = 0;
-    pendingOff = 0;
+    super(16);
   }
 
-  /** Append a value to this buffer. */
-  public void add(long l) {
-    if (pendingOff == MAX_PENDING_COUNT) {
-      packPendingValues();
-    }
-    pending[pendingOff++] = l;
-  }
-  
-  /** Get a value from this buffer. 
-   *  <p>
-   *  <b>NOTE</b>: This class is not really designed for random access!
-   *  You will likely get better performance by using packed ints in another way! */
-  public long get(int index) {
-    assert index < size(); // TODO: do a better check, and throw IndexOutOfBoundsException?
-                           // This class is currently only used by the indexer.
-    int block = index >> BLOCK_BITS;
-    int element = index & BLOCK_MASK;
+  @Override
+  long get(int block, int element) {
     if (block == valuesOff) {
       return pending[element];
-    } else if (values[block] == null) {
+    } else if (deltas[block] == null) {
       return minValues[block];
     } else {
-      return minValues[block] + values[block].get(element);
+      return minValues[block] + deltas[block].get(element);
     }
   }
 
-  private void packPendingValues() {
+  void packPendingValues() {
     assert pendingOff == MAX_PENDING_COUNT;
 
-    // check size
-    if (values.length == valuesOff) {
-      final int newLength = ArrayUtil.oversize(valuesOff + 1, 8);
-      minValues = Arrays.copyOf(minValues, newLength);
-      values = Arrays.copyOf(values, newLength);
-    }
-
     // compute max delta
     long minValue = pending[0];
     long maxValue = pending[0];
@@ -105,18 +66,8 @@
       for (int i = 0; i < pendingOff; ) {
         i += mutable.set(i, pending, i, pendingOff - i);
       }
-      values[valuesOff] = mutable;
-      valuesBytes += mutable.ramBytesUsed();
+      deltas[valuesOff] = mutable;
     }
-    ++valuesOff;
-
-    // reset pending buffer
-    pendingOff = 0;
-  }
-
-  /** Get the number of values that have been added to the buffer. */
-  public int size() {
-    return valuesOff * MAX_PENDING_COUNT + pendingOff;
   }
 
   /** Return an iterator over the values of this buffer. */
@@ -125,29 +76,20 @@
   }
 
   /** A long iterator. */
-  public class Iterator {
-
-    long[] currentValues;
-    int vOff, pOff;
+  public final class Iterator extends AbstractAppendingLongBuffer.Iterator {
 
     private Iterator() {
-      vOff = pOff = 0;
-      if (valuesOff == 0) {
-        currentValues = pending;
-      } else {
-        currentValues = new long[MAX_PENDING_COUNT];
-        fillValues();
-      }
+      super();
     }
 
-    private void fillValues() {
+    void fillValues() {
       if (vOff == valuesOff) {
         currentValues = pending;
-      } else if (values[vOff] == null) {
+      } else if (deltas[vOff] == null) {
         Arrays.fill(currentValues, minValues[vOff]);
       } else {
         for (int k = 0; k < MAX_PENDING_COUNT; ) {
-          k += values[vOff].get(k, currentValues, k, MAX_PENDING_COUNT - k);
+          k += deltas[vOff].get(k, currentValues, k, MAX_PENDING_COUNT - k);
         }
         for (int k = 0; k < MAX_PENDING_COUNT; ++k) {
           currentValues[k] += minValues[vOff];
@@ -155,42 +97,6 @@
       }
     }
 
-    /** Whether or not there are remaining values. */
-    public boolean hasNext() {
-      return vOff < valuesOff || (vOff == valuesOff && pOff < pendingOff);
-    }
-
-    /** Return the next long in the buffer. */
-    public long next() {
-      assert hasNext();
-      long result = currentValues[pOff++];
-      if (pOff == MAX_PENDING_COUNT) {
-        vOff += 1;
-        pOff = 0;
-        if (vOff <= valuesOff) {
-          fillValues();
-        }
-      }
-      return result;
-    }
-
-  }
-
-  /**
-   * Return the number of bytes used by this instance.
-   */
-  public long ramBytesUsed() {
-    // TODO: this is called per-doc-per-norms/dv-field, can we optimize this?
-    long bytesUsed = RamUsageEstimator.alignObjectSize(
-        RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
-        + 3 * RamUsageEstimator.NUM_BYTES_OBJECT_REF // the 3 arrays
-        + 2 * RamUsageEstimator.NUM_BYTES_INT) // the 2 offsets
-        + RamUsageEstimator.NUM_BYTES_LONG // valuesBytes
-        + RamUsageEstimator.sizeOf(pending)
-        + RamUsageEstimator.sizeOf(minValues)
-        + RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF * values.length); // values
-
-    return bytesUsed + valuesBytes;
   }
 
 }
diff --git a/lucene/core/src/java/org/apache/lucene/util/packed/MonotonicAppendingLongBuffer.java b/lucene/core/src/java/org/apache/lucene/util/packed/MonotonicAppendingLongBuffer.java
new file mode 100644
index 0000000..4b00994
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/packed/MonotonicAppendingLongBuffer.java
@@ -0,0 +1,139 @@
+package org.apache.lucene.util.packed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Arrays;
+
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * Utility class to buffer signed longs in memory, which is optimized for the
+ * case where the sequence is monotonic, although it can encode any sequence of
+ * arbitrary longs. It only supports appending.
+ * @lucene.internal
+ */
+public final class MonotonicAppendingLongBuffer extends AbstractAppendingLongBuffer {
+
+  static long zigZagDecode(long n) {
+    return ((n >>> 1) ^ -(n & 1));
+  }
+  
+  static long zigZagEncode(long n) {
+    return (n >> 63) ^ (n << 1);
+  }
+
+  private float[] averages;
+
+  /** Sole constructor. */
+  public MonotonicAppendingLongBuffer() {
+    super(16);
+    averages = new float[16];
+  }
+  
+  long get(int block, int element) {
+    if (block == valuesOff) {
+      return pending[element];
+    } else {
+      final long base = minValues[block] + (long) (averages[block] * (long) element);
+      if (deltas[block] == null) {
+        return base;
+      } else {
+        return base + zigZagDecode(deltas[block].get(element));
+      }
+    }
+  }
+
+  @Override
+  void grow(int newBlockCount) {
+    super.grow(newBlockCount);
+    this.averages = Arrays.copyOf(averages, newBlockCount);
+  }
+
+  @Override
+  void packPendingValues() {
+    assert pendingOff == MAX_PENDING_COUNT;
+
+    minValues[valuesOff] = pending[0];
+    averages[valuesOff] = (float) (pending[BLOCK_MASK] - pending[0]) / BLOCK_MASK;
+
+    for (int i = 0; i < MAX_PENDING_COUNT; ++i) {
+      pending[i] = zigZagEncode(pending[i] - minValues[valuesOff] - (long) (averages[valuesOff] * (long) i));
+    }
+    long maxDelta = 0;
+    for (int i = 0; i < MAX_PENDING_COUNT; ++i) {
+      if (pending[i] < 0) {
+        maxDelta = -1;
+        break;
+      } else {
+        maxDelta = Math.max(maxDelta, pending[i]);
+      }
+    }
+    if (maxDelta != 0) {
+      final int bitsRequired = maxDelta < 0 ? 64 : PackedInts.bitsRequired(maxDelta);
+      final PackedInts.Mutable mutable = PackedInts.getMutable(pendingOff, bitsRequired, PackedInts.COMPACT);
+      for (int i = 0; i < pendingOff; ) {
+        i += mutable.set(i, pending, i, pendingOff - i);
+      }
+      deltas[valuesOff] = mutable;
+    }
+  }
+
+  /** Return an iterator over the values of this buffer. */
+  public Iterator iterator() {
+    return new Iterator();
+  }
+
+  /** A long iterator. */
+  public final class Iterator extends AbstractAppendingLongBuffer.Iterator {
+
+    Iterator() {
+      super();
+    }
+
+    void fillValues() {
+      if (vOff == valuesOff) {
+        currentValues = pending;
+      } else if (deltas[vOff] == null) {
+        for (int k = 0; k < MAX_PENDING_COUNT; ++k) {
+          currentValues[k] = minValues[vOff] + (long) (averages[vOff] * (long) k);
+        }
+      } else {
+        for (int k = 0; k < MAX_PENDING_COUNT; ) {
+          k += deltas[vOff].get(k, currentValues, k, MAX_PENDING_COUNT - k);
+        }
+        for (int k = 0; k < MAX_PENDING_COUNT; ++k) {
+          currentValues[k] = minValues[vOff] + (long) (averages[vOff] * (long) k) + zigZagDecode(currentValues[k]);
+        }
+      }
+    }
+
+  }
+
+  @Override
+  long baseRamBytesUsed() {
+    return super.baseRamBytesUsed()
+        + RamUsageEstimator.NUM_BYTES_OBJECT_REF; // the additional array
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return super.ramBytesUsed()
+        + RamUsageEstimator.sizeOf(averages);
+  }
+
+}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocTermOrds.java b/lucene/core/src/test/org/apache/lucene/index/TestDocTermOrds.java
index 53df62b..84dd30c 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocTermOrds.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocTermOrds.java
@@ -29,7 +29,6 @@
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.IntField;
-import org.apache.lucene.index.DocTermOrds.TermOrdsIterator;
 import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
@@ -63,25 +62,26 @@
     final IndexReader r = w.getReader();
     w.close();
 
-    final DocTermOrds dto = new DocTermOrds(SlowCompositeReaderWrapper.wrap(r), "field");
+    final AtomicReader ar = SlowCompositeReaderWrapper.wrap(r);
+    final DocTermOrds dto = new DocTermOrds(ar, "field");
+    SortedSetDocValues iter = dto.iterator(ar.terms("field").iterator(null));
+    
+    iter.setDocument(0);
+    assertEquals(0, iter.nextOrd());
+    assertEquals(1, iter.nextOrd());
+    assertEquals(2, iter.nextOrd());
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, iter.nextOrd());
+    
+    iter.setDocument(1);
+    assertEquals(3, iter.nextOrd());
+    assertEquals(4, iter.nextOrd());
+    assertEquals(5, iter.nextOrd());
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, iter.nextOrd());
 
-    TermOrdsIterator iter = dto.lookup(0, null);
-    final int[] buffer = new int[5];
-    assertEquals(3, iter.read(buffer));
-    assertEquals(0, buffer[0]);
-    assertEquals(1, buffer[1]);
-    assertEquals(2, buffer[2]);
-
-    iter = dto.lookup(1, iter);
-    assertEquals(3, iter.read(buffer));
-    assertEquals(3, buffer[0]);
-    assertEquals(4, buffer[1]);
-    assertEquals(5, buffer[2]);
-
-    iter = dto.lookup(2, iter);
-    assertEquals(2, iter.read(buffer));
-    assertEquals(0, buffer[0]);
-    assertEquals(5, buffer[1]);
+    iter.setDocument(2);
+    assertEquals(0, iter.nextOrd());
+    assertEquals(5, iter.nextOrd());
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, iter.nextOrd());
 
     r.close();
     dir.close();
@@ -352,31 +352,24 @@
       }
     }
 
-    TermOrdsIterator iter = null;
-    final int[] buffer = new int[5];
+    SortedSetDocValues iter = dto.iterator(te);
     for(int docID=0;docID<r.maxDoc();docID++) {
       if (VERBOSE) {
         System.out.println("TEST: docID=" + docID + " of " + r.maxDoc() + " (id=" + docIDToID.get(docID) + ")");
       }
-      iter = dto.lookup(docID, iter);
+      iter.setDocument(docID);
       final int[] answers = idToOrds[docIDToID.get(docID)];
       int upto = 0;
-      while(true) {
-        final int chunk = iter.read(buffer);
-        for(int idx=0;idx<chunk;idx++) {
-          te.seekExact((long) buffer[idx]);
-          final BytesRef expected = termsArray[answers[upto++]];
-          if (VERBOSE) {
-            System.out.println("  exp=" + expected.utf8ToString() + " actual=" + te.term().utf8ToString());
-          }
-          assertEquals("expected=" + expected.utf8ToString() + " actual=" + te.term().utf8ToString() + " ord=" + buffer[idx], expected, te.term());
+      long ord;
+      while ((ord = iter.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+        te.seekExact(ord);
+        final BytesRef expected = termsArray[answers[upto++]];
+        if (VERBOSE) {
+          System.out.println("  exp=" + expected.utf8ToString() + " actual=" + te.term().utf8ToString());
         }
-        
-        if (chunk < buffer.length) {
-          assertEquals(answers.length, upto);
-          break;
-        }
+        assertEquals("expected=" + expected.utf8ToString() + " actual=" + te.term().utf8ToString() + " ord=" + ord, expected, te.term());
       }
+      assertEquals(answers.length, upto);
     }
   }
 }
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocValuesIndexing.java b/lucene/core/src/test/org/apache/lucene/index/TestDocValuesIndexing.java
index f3e68bf..d79a081 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocValuesIndexing.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocValuesIndexing.java
@@ -28,6 +28,7 @@
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.store.Directory;
@@ -372,6 +373,30 @@
     iwriter.close();
     directory.close();
   }
+  
+  public void testTooLargeTermSortedSetBytes() throws IOException {
+    assumeTrue("codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Analyzer analyzer = new MockAnalyzer(random());
+
+    Directory directory = newDirectory();
+    // we don't use RandomIndexWriter because it might add more docvalues than we expect !!!!1
+    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwc.setMergePolicy(newLogMergePolicy());
+    IndexWriter iwriter = new IndexWriter(directory, iwc);
+    Document doc = new Document();
+    byte bytes[] = new byte[100000];
+    BytesRef b = new BytesRef(bytes);
+    random().nextBytes(bytes);
+    doc.add(new SortedSetDocValuesField("dv", b));
+    try {
+      iwriter.addDocument(doc);
+      fail("did not get expected exception");
+    } catch (IllegalArgumentException expected) {
+      // expected
+    }
+    iwriter.close();
+    directory.close();
+  }
 
   // Two documents across segments
   public void testMixedTypesDifferentSegments() throws Exception {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDuelingCodecs.java b/lucene/core/src/test/org/apache/lucene/index/TestDuelingCodecs.java
index e2a63bb..4e54667 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDuelingCodecs.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDuelingCodecs.java
@@ -30,6 +30,8 @@
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.index.TermsEnum.SeekStatus;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.store.Directory;
@@ -102,6 +104,10 @@
     rightReader = maybeWrapReader(rightWriter.getReader());
     rightWriter.close();
     
+    // check that our readers are valid
+    _TestUtil.checkReader(leftReader);
+    _TestUtil.checkReader(rightReader);
+    
     info = "left: " + leftCodec.toString() + " / right: " + rightCodec.toString();
   }
   
@@ -135,7 +141,14 @@
     // TODO: we should add other fields that use things like docs&freqs but omit positions,
     // because linefiledocs doesn't cover all the possibilities.
     for (int i = 0; i < numdocs; i++) {
-      writer.addDocument(lineFileDocs.nextDoc());
+      Document document = lineFileDocs.nextDoc();
+      // grab the title and add some SortedSet instances for fun
+      String title = document.get("titleTokenized");
+      String split[] = title.split("\\s+");
+      for (String trash : split) {
+        document.add(new SortedSetDocValuesField("sortedset", new BytesRef(trash)));
+      }
+      writer.addDocument(document);
     }
     
     lineFileDocs.close();
@@ -683,6 +696,36 @@
           assertNull(info, rightValues);
         }
       }
+      
+      {
+        SortedSetDocValues leftValues = MultiDocValues.getSortedSetValues(leftReader, field);
+        SortedSetDocValues rightValues = MultiDocValues.getSortedSetValues(rightReader, field);
+        if (leftValues != null && rightValues != null) {
+          // numOrds
+          assertEquals(info, leftValues.getValueCount(), rightValues.getValueCount());
+          // ords
+          BytesRef scratchLeft = new BytesRef();
+          BytesRef scratchRight = new BytesRef();
+          for (int i = 0; i < leftValues.getValueCount(); i++) {
+            leftValues.lookupOrd(i, scratchLeft);
+            rightValues.lookupOrd(i, scratchRight);
+            assertEquals(info, scratchLeft, scratchRight);
+          }
+          // ord lists
+          for(int docID=0;docID<leftReader.maxDoc();docID++) {
+            leftValues.setDocument(docID);
+            rightValues.setDocument(docID);
+            long ord;
+            while ((ord = leftValues.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+              assertEquals(info, ord, rightValues.nextOrd());
+            }
+            assertEquals(info, SortedSetDocValues.NO_MORE_ORDS, rightValues.nextOrd());
+          }
+        } else {
+          assertNull(info, leftValues);
+          assertNull(info, rightValues);
+        }
+      }
     }
   }
   
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 3a73ba2..8fcfcee 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -40,6 +40,7 @@
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
@@ -1023,6 +1024,10 @@
       doc.add(new BinaryDocValuesField("binarydv", new BytesRef("500")));
       doc.add(new NumericDocValuesField("numericdv", 500));
       doc.add(new SortedDocValuesField("sorteddv", new BytesRef("500")));
+      if (defaultCodecSupportsSortedSet()) {
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("one")));
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("two")));
+      }
       w.addDocument(doc);
       doc = new Document();
       doc.add(newStringField(random, "id", "501", Field.Store.NO));
@@ -1030,6 +1035,10 @@
       doc.add(new BinaryDocValuesField("binarydv", new BytesRef("501")));
       doc.add(new NumericDocValuesField("numericdv", 501));
       doc.add(new SortedDocValuesField("sorteddv", new BytesRef("501")));
+      if (defaultCodecSupportsSortedSet()) {
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("two")));
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("three")));
+      }
       w.addDocument(doc);
       w.deleteDocuments(new Term("id", "500"));
       w.close();
@@ -1057,16 +1066,21 @@
             Field binaryDVField = new BinaryDocValuesField("binarydv", new BytesRef());
             Field numericDVField = new NumericDocValuesField("numericdv", 0);
             Field sortedDVField = new SortedDocValuesField("sorteddv", new BytesRef());
+            Field sortedSetDVField = new SortedSetDocValuesField("sortedsetdv", new BytesRef());
             doc.add(idField);
             doc.add(newField(random, "field", "some text contents", storedTextType));
             doc.add(binaryDVField);
             doc.add(numericDVField);
             doc.add(sortedDVField);
+            if (defaultCodecSupportsSortedSet()) {
+              doc.add(sortedSetDVField);
+            }
             for(int i=0;i<100;i++) {
               idField.setStringValue(Integer.toString(i));
               binaryDVField.setBytesValue(new BytesRef(idField.stringValue()));
               numericDVField.setLongValue(i);
               sortedDVField.setBytesValue(new BytesRef(idField.stringValue()));
+              sortedSetDVField.setBytesValue(new BytesRef(idField.stringValue()));
               int action = random.nextInt(100);
               if (action == 17) {
                 w.addIndexes(adder);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
index 6bf5feb..16f3589 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
@@ -33,6 +33,7 @@
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@@ -143,6 +144,10 @@
       doc.add(new NumericDocValuesField("numericdv", 5));
       doc.add(new BinaryDocValuesField("binarydv", new BytesRef("hello")));
       doc.add(new SortedDocValuesField("sorteddv", new BytesRef("world")));
+      if (defaultCodecSupportsSortedSet()) {
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("hellllo")));
+        doc.add(new SortedSetDocValuesField("sortedsetdv", new BytesRef("again")));
+      }
 
       doc.add(newField(r, "content7", "aaa bbb ccc ddd", DocCopyIterator.custom4));
 
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java b/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java
index de9ca00..78c8974 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java
@@ -17,11 +17,14 @@
  * limitations under the License.
  */
 
+import java.util.ArrayList;
+
 import org.apache.lucene.document.BinaryDocValuesField;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.LuceneTestCase;
@@ -189,4 +192,133 @@
     ir2.close();
     dir.close();
   }
+  
+  public void testSortedSet() throws Exception {
+    assumeTrue("codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory dir = newDirectory();
+    
+    IndexWriterConfig iwc = newIndexWriterConfig(random(), TEST_VERSION_CURRENT, null);
+    iwc.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);
+
+    int numDocs = atLeast(500);
+    for (int i = 0; i < numDocs; i++) {
+      Document doc = new Document();
+      int numValues = random().nextInt(5);
+      for (int j = 0; j < numValues; j++) {
+        doc.add(new SortedSetDocValuesField("bytes", new BytesRef(_TestUtil.randomUnicodeString(random()))));
+      }
+      iw.addDocument(doc);
+      if (random().nextInt(17) == 0) {
+        iw.commit();
+      }
+    }
+    DirectoryReader ir = iw.getReader();
+    iw.forceMerge(1);
+    DirectoryReader ir2 = iw.getReader();
+    AtomicReader merged = getOnlySegmentReader(ir2);
+    iw.close();
+    
+    SortedSetDocValues multi = MultiDocValues.getSortedSetValues(ir, "bytes");
+    SortedSetDocValues single = merged.getSortedSetDocValues("bytes");
+    if (multi == null) {
+      assertNull(single);
+    } else {
+      assertEquals(single.getValueCount(), multi.getValueCount());
+      BytesRef actual = new BytesRef();
+      BytesRef expected = new BytesRef();
+      // check values
+      for (long i = 0; i < single.getValueCount(); i++) {
+        single.lookupOrd(i, expected);
+        multi.lookupOrd(i, actual);
+        assertEquals(expected, actual);
+      }
+      // check ord list
+      for (int i = 0; i < numDocs; i++) {
+        single.setDocument(i);
+        ArrayList<Long> expectedList = new ArrayList<Long>();
+        long ord;
+        while ((ord = single.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          expectedList.add(ord);
+        }
+        
+        multi.setDocument(i);
+        int upto = 0;
+        while ((ord = multi.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          assertEquals(expectedList.get(upto).longValue(), ord);
+          upto++;
+        }
+        assertEquals(expectedList.size(), upto);
+      }
+    }
+    
+    ir.close();
+    ir2.close();
+    dir.close();
+  }
+  
+  // tries to make more dups than testSortedSet
+  public void testSortedSetWithDups() throws Exception {
+    assumeTrue("codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory dir = newDirectory();
+    
+    IndexWriterConfig iwc = newIndexWriterConfig(random(), TEST_VERSION_CURRENT, null);
+    iwc.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);
+
+    int numDocs = atLeast(500);
+    for (int i = 0; i < numDocs; i++) {
+      Document doc = new Document();
+      int numValues = random().nextInt(5);
+      for (int j = 0; j < numValues; j++) {
+        doc.add(new SortedSetDocValuesField("bytes", new BytesRef(_TestUtil.randomSimpleString(random(), 2))));
+      }
+      iw.addDocument(doc);
+      if (random().nextInt(17) == 0) {
+        iw.commit();
+      }
+    }
+    DirectoryReader ir = iw.getReader();
+    iw.forceMerge(1);
+    DirectoryReader ir2 = iw.getReader();
+    AtomicReader merged = getOnlySegmentReader(ir2);
+    iw.close();
+    
+    SortedSetDocValues multi = MultiDocValues.getSortedSetValues(ir, "bytes");
+    SortedSetDocValues single = merged.getSortedSetDocValues("bytes");
+    if (multi == null) {
+      assertNull(single);
+    } else {
+      assertEquals(single.getValueCount(), multi.getValueCount());
+      BytesRef actual = new BytesRef();
+      BytesRef expected = new BytesRef();
+      // check values
+      for (long i = 0; i < single.getValueCount(); i++) {
+        single.lookupOrd(i, expected);
+        multi.lookupOrd(i, actual);
+        assertEquals(expected, actual);
+      }
+      // check ord list
+      for (int i = 0; i < numDocs; i++) {
+        single.setDocument(i);
+        ArrayList<Long> expectedList = new ArrayList<Long>();
+        long ord;
+        while ((ord = single.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          expectedList.add(ord);
+        }
+        
+        multi.setDocument(i);
+        int upto = 0;
+        while ((ord = multi.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          assertEquals(expectedList.get(upto).longValue(), ord);
+          upto++;
+        }
+        assertEquals(expectedList.size(), upto);
+      }
+    }
+    
+    ir.close();
+    ir2.close();
+    dir.close();
+  }
 }
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestFieldCache.java b/lucene/core/src/test/org/apache/lucene/search/TestFieldCache.java
index 1b0ec68..fe9a57b 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestFieldCache.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestFieldCache.java
@@ -29,12 +29,14 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.codecs.DocValuesFormat;
 import org.apache.lucene.document.BinaryDocValuesField;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.*;
@@ -263,45 +265,33 @@
     terms = cache.getTerms(reader, "bogusfield");
 
     // getDocTermOrds
-    DocTermOrds termOrds = cache.getDocTermOrds(reader, "theRandomUnicodeMultiValuedField");
-    TermsEnum termsEnum = termOrds.getOrdTermsEnum(reader);
-    assertSame("Second request to cache return same DocTermOrds", termOrds, cache.getDocTermOrds(reader, "theRandomUnicodeMultiValuedField"));
-    DocTermOrds.TermOrdsIterator reuse = null;
+    SortedSetDocValues termOrds = cache.getDocTermOrds(reader, "theRandomUnicodeMultiValuedField");
+    int numEntries = cache.getCacheEntries().length;
+    // ask for it again, and check that we didnt create any additional entries:
+    termOrds = cache.getDocTermOrds(reader, "theRandomUnicodeMultiValuedField");
+    assertEquals(numEntries, cache.getCacheEntries().length);
+
     for (int i = 0; i < NUM_DOCS; i++) {
-      reuse = termOrds.lookup(i, reuse);
-      final int[] buffer = new int[5];
+      termOrds.setDocument(i);
       // This will remove identical terms. A DocTermOrds doesn't return duplicate ords for a docId
       List<BytesRef> values = new ArrayList<BytesRef>(new LinkedHashSet<BytesRef>(Arrays.asList(multiValued[i])));
-      for (;;) {
-        int chunk = reuse.read(buffer);
-        if (chunk == 0) {
-          for (int ord = 0; ord < values.size(); ord++) {
-            BytesRef term = values.get(ord);
-            assertNull(String.format(Locale.ROOT, "Document[%d] misses field must be null. Has value %s for ord %d", i, term, ord), term);
-          }
+      for (BytesRef v : values) {
+        if (v == null) {
+          // why does this test use null values... instead of an empty list: confusing
           break;
         }
-
-        for(int idx=0; idx < chunk; idx++) {
-          int key = buffer[idx];
-          termsEnum.seekExact((long) key);
-          String actual = termsEnum.term().utf8ToString();
-          String expected = values.get(idx).utf8ToString();
-          if (!expected.equals(actual)) {
-              reuse = termOrds.lookup(i, reuse);
-              reuse.read(buffer);
-          }
-          assertTrue(String.format(Locale.ROOT, "Expected value %s for doc %d and ord %d, but was %s", expected, i, idx, actual), expected.equals(actual));
-        }
-
-        if (chunk <= buffer.length) {
-          break;
-        }
+        long ord = termOrds.nextOrd();
+        assert ord != SortedSetDocValues.NO_MORE_ORDS;
+        BytesRef scratch = new BytesRef();
+        termOrds.lookupOrd(ord, scratch);
+        assertEquals(v, scratch);
       }
+      assertEquals(SortedSetDocValues.NO_MORE_ORDS, termOrds.nextOrd());
     }
 
     // test bad field
     termOrds = cache.getDocTermOrds(reader, "bogusfield");
+    assertTrue(termOrds.getValueCount() == 0);
 
     FieldCache.DEFAULT.purge(reader);
   }
@@ -444,11 +434,16 @@
   
   public void testDocValuesIntegration() throws Exception {
     Directory dir = newDirectory();
-    RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, null);
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);
     Document doc = new Document();
     doc.add(new BinaryDocValuesField("binary", new BytesRef("binary value")));
     doc.add(new SortedDocValuesField("sorted", new BytesRef("sorted value")));
     doc.add(new NumericDocValuesField("numeric", 42));
+    if (defaultCodecSupportsSortedSet()) {
+      doc.add(new SortedSetDocValuesField("sortedset", new BytesRef("sortedset value1")));
+      doc.add(new SortedSetDocValuesField("sortedset", new BytesRef("sortedset value2")));
+    }
     iw.addDocument(doc);
     DirectoryReader ir = iw.getReader();
     iw.close();
@@ -471,15 +466,30 @@
       fail();
     } catch (IllegalStateException expected) {}
     
+    try {
+      FieldCache.DEFAULT.getDocTermOrds(ar, "binary");
+      fail();
+    } catch (IllegalStateException expected) {}
+    
+    try {
+      new DocTermOrds(ar, "binary");
+      fail();
+    } catch (IllegalStateException expected) {}
+    
     Bits bits = FieldCache.DEFAULT.getDocsWithField(ar, "binary");
     assertTrue(bits instanceof Bits.MatchAllBits);
     
-    // Sorted type: can be retrieved via getTerms() or getTermsIndex()
+    // Sorted type: can be retrieved via getTerms(), getTermsIndex(), getDocTermOrds()
     try {
       FieldCache.DEFAULT.getInts(ar, "sorted", false);
       fail();
     } catch (IllegalStateException expected) {}
     
+    try {
+      new DocTermOrds(ar, "sorted");
+      fail();
+    } catch (IllegalStateException expected) {}
+    
     binary = FieldCache.DEFAULT.getTerms(ar, "sorted");
     binary.get(0, scratch);
     assertEquals("sorted value", scratch.utf8ToString());
@@ -490,6 +500,12 @@
     sorted.get(0, scratch);
     assertEquals("sorted value", scratch.utf8ToString());
     
+    SortedSetDocValues sortedSet = FieldCache.DEFAULT.getDocTermOrds(ar, "sorted");
+    sortedSet.setDocument(0);
+    assertEquals(0, sortedSet.nextOrd());
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, sortedSet.nextOrd());
+    assertEquals(1, sortedSet.getValueCount());
+    
     bits = FieldCache.DEFAULT.getDocsWithField(ar, "sorted");
     assertTrue(bits instanceof Bits.MatchAllBits);
     
@@ -507,9 +523,52 @@
       fail();
     } catch (IllegalStateException expected) {}
     
+    try {
+      FieldCache.DEFAULT.getDocTermOrds(ar, "numeric");
+      fail();
+    } catch (IllegalStateException expected) {}
+    
+    try {
+      new DocTermOrds(ar, "numeric");
+      fail();
+    } catch (IllegalStateException expected) {}
+    
     bits = FieldCache.DEFAULT.getDocsWithField(ar, "numeric");
     assertTrue(bits instanceof Bits.MatchAllBits);
     
+    // SortedSet type: can be retrieved via getDocTermOrds() 
+    if (defaultCodecSupportsSortedSet()) {
+      try {
+        FieldCache.DEFAULT.getInts(ar, "sortedset", false);
+        fail();
+      } catch (IllegalStateException expected) {}
+    
+      try {
+        FieldCache.DEFAULT.getTerms(ar, "sortedset");
+        fail();
+      } catch (IllegalStateException expected) {}
+    
+      try {
+        FieldCache.DEFAULT.getTermsIndex(ar, "sortedset");
+        fail();
+      } catch (IllegalStateException expected) {}
+      
+      try {
+        new DocTermOrds(ar, "sortedset");
+        fail();
+      } catch (IllegalStateException expected) {}
+    
+      sortedSet = FieldCache.DEFAULT.getDocTermOrds(ar, "sortedset");
+      sortedSet.setDocument(0);
+      assertEquals(0, sortedSet.nextOrd());
+      assertEquals(1, sortedSet.nextOrd());
+      assertEquals(SortedSetDocValues.NO_MORE_ORDS, sortedSet.nextOrd());
+      assertEquals(2, sortedSet.getValueCount());
+    
+      bits = FieldCache.DEFAULT.getDocsWithField(ar, "sortedset");
+      assertTrue(bits instanceof Bits.MatchAllBits);
+    }
+    
     ir.close();
     dir.close();
   }
@@ -556,6 +615,10 @@
     sorted.get(0, scratch);
     assertTrue(scratch.bytes == BinaryDocValues.MISSING);
     
+    SortedSetDocValues sortedSet = cache.getDocTermOrds(ar, "bogusmultivalued");
+    sortedSet.setDocument(0);
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, sortedSet.nextOrd());
+    
     Bits bits = cache.getDocsWithField(ar, "bogusbits");
     assertFalse(bits.get(0));
     
@@ -577,6 +640,7 @@
     doc.add(new StoredField("bogusdoubles", "bogus"));
     doc.add(new StoredField("bogusterms", "bogus"));
     doc.add(new StoredField("bogustermsindex", "bogus"));
+    doc.add(new StoredField("bogusmultivalued", "bogus"));
     doc.add(new StoredField("bogusbits", "bogus"));
     iw.addDocument(doc);
     DirectoryReader ir = iw.getReader();
@@ -616,6 +680,10 @@
     sorted.get(0, scratch);
     assertTrue(scratch.bytes == BinaryDocValues.MISSING);
     
+    SortedSetDocValues sortedSet = cache.getDocTermOrds(ar, "bogusmultivalued");
+    sortedSet.setDocument(0);
+    assertEquals(SortedSetDocValues.NO_MORE_ORDS, sortedSet.nextOrd());
+    
     Bits bits = cache.getDocsWithField(ar, "bogusbits");
     assertFalse(bits.get(0));
     
diff --git a/lucene/core/src/test/org/apache/lucene/util/packed/TestPackedInts.java b/lucene/core/src/test/org/apache/lucene/util/packed/TestPackedInts.java
index 260b114..10e7b01 100644
--- a/lucene/core/src/test/org/apache/lucene/util/packed/TestPackedInts.java
+++ b/lucene/core/src/test/org/apache/lucene/util/packed/TestPackedInts.java
@@ -805,42 +805,55 @@
   }
 
   public void testAppendingLongBuffer() {
-    final long[] arr = new long[RandomInts.randomIntBetween(random(), 1, 2000000)];
-    for (int bpv : new int[] {0, 1, 63, 64, RandomInts.randomIntBetween(random(), 2, 61)}) {
-      if (bpv == 0) {
-        Arrays.fill(arr, random().nextLong());
-      } else if (bpv == 64) {
+    final long[] arr = new long[RandomInts.randomIntBetween(random(), 1, 1000000)];
+    for (int bpv : new int[] {0, 1, 63, 64, RandomInts.randomIntBetween(random(), 2, 62)}) {
+      for (boolean monotonic : new boolean[] {true, false}) {
+        AbstractAppendingLongBuffer buf;
+        final int inc;
+        if (monotonic) {
+          buf = new MonotonicAppendingLongBuffer();
+          inc = _TestUtil.nextInt(random(), -1000, 1000);
+        } else {
+          buf = new AppendingLongBuffer();
+          inc = 0;
+        }
+        if (bpv == 0) {
+          arr[0] = random().nextLong();
+          for (int i = 1; i < arr.length; ++i) {
+            arr[i] = arr[i-1] + inc;
+          }
+        } else if (bpv == 64) {
+          for (int i = 0; i < arr.length; ++i) {
+            arr[i] = random().nextLong();
+          }
+        } else {
+          final long minValue = _TestUtil.nextLong(random(), Long.MIN_VALUE, Long.MAX_VALUE - PackedInts.maxValue(bpv));
+          for (int i = 0; i < arr.length; ++i) {
+            arr[i] = minValue + inc * i + random().nextLong() & PackedInts.maxValue(bpv); // _TestUtil.nextLong is too slow
+          }
+        }
         for (int i = 0; i < arr.length; ++i) {
-          arr[i] = random().nextLong();
+          buf.add(arr[i]);
         }
-      } else {
-        final long minValue = _TestUtil.nextLong(random(), Long.MIN_VALUE, Long.MAX_VALUE - PackedInts.maxValue(bpv));
+        assertEquals(arr.length, buf.size());
+        final AbstractAppendingLongBuffer.Iterator it = buf.iterator();
         for (int i = 0; i < arr.length; ++i) {
-          arr[i] = minValue + random().nextLong() & PackedInts.maxValue(bpv); // _TestUtil.nextLong is too slow
+          if (random().nextBoolean()) {
+            assertTrue(it.hasNext());
+          }
+          assertEquals(arr[i], it.next());
         }
-      }
-      AppendingLongBuffer buf = new AppendingLongBuffer();
-      for (int i = 0; i < arr.length; ++i) {
-        buf.add(arr[i]);
-      }
-      assertEquals(arr.length, buf.size());
-      final AppendingLongBuffer.Iterator it = buf.iterator();
-      for (int i = 0; i < arr.length; ++i) {
-        if (random().nextBoolean()) {
-          assertTrue(it.hasNext());
+        assertFalse(it.hasNext());
+        
+        for (int i = 0; i < arr.length; ++i) {
+          assertEquals(arr[i], buf.get(i));
         }
-        assertEquals(arr[i], it.next());
+  
+        final long expectedBytesUsed = RamUsageEstimator.sizeOf(buf);
+        final long computedBytesUsed = buf.ramBytesUsed();
+        assertEquals("got " + computedBytesUsed + ", expected: " + expectedBytesUsed,
+            expectedBytesUsed, computedBytesUsed);
       }
-      assertFalse(it.hasNext());
-      
-      for (int i = 0; i < arr.length; ++i) {
-        assertEquals(arr[i], buf.get(i));
-      }
-
-      final long expectedBytesUsed = RamUsageEstimator.sizeOf(buf);
-      final long computedBytesUsed = buf.ramBytesUsed();
-      assertEquals("got " + computedBytesUsed + ", expected: " + expectedBytesUsed,
-          expectedBytesUsed, computedBytesUsed);
     }
   }
 
diff --git a/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesConsumer.java b/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesConsumer.java
index f82f1a2..e08899c 100644
--- a/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesConsumer.java
+++ b/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesConsumer.java
@@ -98,6 +98,11 @@
   }
   
   @Override
+  public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+    throw new UnsupportedOperationException("FacetsDocValues can only handle binary fields");
+  }
+
+  @Override
   public void close() throws IOException {
     boolean success = false;
     try {
diff --git a/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesProducer.java b/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesProducer.java
index 1ecfa57..ad7cb27 100644
--- a/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesProducer.java
+++ b/lucene/facet/src/java/org/apache/lucene/facet/codecs/facet42/Facet42DocValuesProducer.java
@@ -29,6 +29,7 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.IOUtils;
 
@@ -75,6 +76,11 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+    throw new UnsupportedOperationException("FacetsDocValues only implements binary");
+  }
+
+  @Override
   public void close() throws IOException {
   }
 }
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/term/TermGroupFacetCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/term/TermGroupFacetCollector.java
index 70b58d9..f4eba5a 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/term/TermGroupFacetCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/term/TermGroupFacetCollector.java
@@ -25,6 +25,8 @@
 import org.apache.lucene.index.DocTermOrds;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.SortedDocValuesTermsEnum;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.index.SortedSetDocValuesTermsEnum;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.search.grouping.AbstractGroupFacetCollector;
@@ -196,9 +198,10 @@
   // Implementation for multi valued facet fields.
   static class MV extends TermGroupFacetCollector {
 
-    private DocTermOrds facetFieldDocTermOrds;
+    private SortedSetDocValues facetFieldDocTermOrds;
     private TermsEnum facetOrdTermsEnum;
-    private DocTermOrds.TermOrdsIterator reuse;
+    private int facetFieldNumTerms;
+    private final BytesRef scratch = new BytesRef();
 
     MV(String groupField, String facetField, BytesRef facetPrefix, int initialSize) {
       super(groupField, facetField, facetPrefix, initialSize);
@@ -207,14 +210,14 @@
     @Override
     public void collect(int doc) throws IOException {
       int groupOrd = groupFieldTermsIndex.getOrd(doc);
-      if (facetFieldDocTermOrds.isEmpty()) {
-        int segmentGroupedFacetsIndex = groupOrd * (facetFieldDocTermOrds.numTerms() + 1);
+      if (facetFieldNumTerms == 0) {
+        int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1);
         if (facetPrefix != null || segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
           return;
         }
 
         segmentTotalCount++;
-        segmentFacetCounts[facetFieldDocTermOrds.numTerms()]++;
+        segmentFacetCounts[facetFieldNumTerms]++;
 
         segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
         BytesRef groupKey;
@@ -228,51 +231,50 @@
         return;
       }
 
-      if (facetOrdTermsEnum != null) {
-        reuse = facetFieldDocTermOrds.lookup(doc, reuse);
+      facetFieldDocTermOrds.setDocument(doc);
+      long ord;
+      boolean empty = true;
+      while ((ord = facetFieldDocTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+        process(groupOrd, (int) ord);
+        empty = false;
       }
-      int chunk;
-      boolean first = true;
-      int[] buffer = new int[5];
-      do {
-        chunk = reuse != null ? reuse.read(buffer) : 0;
-        if (first && chunk == 0) {
-          chunk = 1;
-          buffer[0] = facetFieldDocTermOrds.numTerms(); // this facet ord is reserved for docs not containing facet field.
-        }
-        first = false;
+      
+      if (empty) {
+        process(groupOrd, facetFieldNumTerms); // this facet ord is reserved for docs not containing facet field.
+      }
+    }
+    
+    private void process(int groupOrd, int facetOrd) {
+      if (facetOrd < startFacetOrd || facetOrd >= endFacetOrd) {
+        return;
+      }
 
-        for (int pos = 0; pos < chunk; pos++) {
-          int facetOrd = buffer[pos];
-          if (facetOrd < startFacetOrd || facetOrd >= endFacetOrd) {
-            continue;
-          }
+      int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1) + facetOrd;
+      if (segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
+        return;
+      }
 
-          int segmentGroupedFacetsIndex = groupOrd * (facetFieldDocTermOrds.numTerms() + 1) + facetOrd;
-          if (segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
-            continue;
-          }
+      segmentTotalCount++;
+      segmentFacetCounts[facetOrd]++;
 
-          segmentTotalCount++;
-          segmentFacetCounts[facetOrd]++;
+      segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
 
-          segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+      BytesRef groupKey;
+      if (groupOrd == -1) {
+        groupKey = null;
+      } else {
+        groupKey = new BytesRef();
+        groupFieldTermsIndex.lookupOrd(groupOrd, groupKey);
+      }
 
-          BytesRef groupKey;
-          if (groupOrd == -1) {
-            groupKey = null;
-          } else {
-            groupKey = new BytesRef();
-            groupFieldTermsIndex.lookupOrd(groupOrd, groupKey);
-          }
-
-          groupedFacetHits.add(
-              new GroupedFacetHit(groupKey,
-                  facetOrd == facetFieldDocTermOrds.numTerms() ? null : BytesRef.deepCopyOf(facetFieldDocTermOrds.lookupTerm(facetOrdTermsEnum, facetOrd))
-              )
-          );
-        }
-      } while (chunk >= buffer.length);
+      final BytesRef facetValue;
+      if (facetOrd == facetFieldNumTerms) {
+        facetValue = null;
+      } else {
+        facetFieldDocTermOrds.lookupOrd(facetOrd, scratch);
+        facetValue = BytesRef.deepCopyOf(scratch); // must we?
+      }
+      groupedFacetHits.add(new GroupedFacetHit(groupKey, facetValue));
     }
 
     @Override
@@ -281,12 +283,16 @@
         segmentResults.add(createSegmentResult());
       }
 
-      reuse = null;
       groupFieldTermsIndex = FieldCache.DEFAULT.getTermsIndex(context.reader(), groupField);
       facetFieldDocTermOrds = FieldCache.DEFAULT.getDocTermOrds(context.reader(), facetField);
-      facetOrdTermsEnum = facetFieldDocTermOrds.getOrdTermsEnum(context.reader());
-      // [facetFieldDocTermOrds.numTerms() + 1] for all possible facet values and docs not containing facet field
-      segmentFacetCounts = new int[facetFieldDocTermOrds.numTerms() + 1];
+      facetFieldNumTerms = (int) facetFieldDocTermOrds.getValueCount();
+      if (facetFieldNumTerms == 0) {
+        facetOrdTermsEnum = null;
+      } else {
+        facetOrdTermsEnum = new SortedSetDocValuesTermsEnum(facetFieldDocTermOrds);
+      }
+      // [facetFieldNumTerms() + 1] for all possible facet values and docs not containing facet field
+      segmentFacetCounts = new int[facetFieldNumTerms + 1];
       segmentTotalCount = 0;
 
       segmentGroupedFacetHits.clear();
@@ -303,11 +309,11 @@
           }
           facetOrd = (int) facetOrdTermsEnum.ord();
         } else {
-          facetOrd = facetFieldDocTermOrds.numTerms();
+          facetOrd = facetFieldNumTerms;
         }
 
         // (facetFieldDocTermOrds.numTerms() + 1) for all possible facet values and docs not containing facet field
-        int segmentGroupedFacetsIndex = groupOrd * (facetFieldDocTermOrds.numTerms() + 1) + facetOrd;
+        int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1) + facetOrd;
         segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
       }
 
@@ -333,17 +339,17 @@
         if (seekStatus != TermsEnum.SeekStatus.END) {
           endFacetOrd = (int) facetOrdTermsEnum.ord();
         } else {
-          endFacetOrd = facetFieldDocTermOrds.numTerms(); // Don't include null...
+          endFacetOrd = facetFieldNumTerms; // Don't include null...
         }
       } else {
         startFacetOrd = 0;
-        endFacetOrd = facetFieldDocTermOrds.numTerms() + 1;
+        endFacetOrd = facetFieldNumTerms + 1;
       }
     }
 
     @Override
     protected SegmentResult createSegmentResult() throws IOException {
-      return new SegmentResult(segmentFacetCounts, segmentTotalCount, facetFieldDocTermOrds.numTerms(), facetOrdTermsEnum, startFacetOrd, endFacetOrd);
+      return new SegmentResult(segmentFacetCounts, segmentTotalCount, facetFieldNumTerms, facetOrdTermsEnum, startFacetOrd, endFacetOrd);
     }
 
     private static class SegmentResult extends AbstractGroupFacetCollector.SegmentResult {
diff --git a/lucene/join/src/java/org/apache/lucene/search/join/TermsCollector.java b/lucene/join/src/java/org/apache/lucene/search/join/TermsCollector.java
index 68babfd..afd589f 100644
--- a/lucene/join/src/java/org/apache/lucene/search/join/TermsCollector.java
+++ b/lucene/join/src/java/org/apache/lucene/search/join/TermsCollector.java
@@ -21,8 +21,7 @@
 
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.BinaryDocValues;
-import org.apache.lucene.index.DocTermOrds;
-import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.search.Scorer;
@@ -69,10 +68,8 @@
 
   // impl that works with multiple values per document
   static class MV extends TermsCollector {
-
-    private DocTermOrds docTermOrds;
-    private TermsEnum docTermsEnum;
-    private DocTermOrds.TermOrdsIterator reuse;
+    final BytesRef scratch = new BytesRef();
+    private SortedSetDocValues docTermOrds;
 
     MV(String field) {
       super(field);
@@ -80,29 +77,17 @@
 
     @Override
     public void collect(int doc) throws IOException {
-      reuse = docTermOrds.lookup(doc, reuse);
-      int[] buffer = new int[5];
-
-      int chunk;
-      do {
-        chunk = reuse.read(buffer);
-        if (chunk == 0) {
-          return;
-        }
-
-        for (int idx = 0; idx < chunk; idx++) {
-          int key = buffer[idx];
-          docTermsEnum.seekExact((long) key);
-          collectorTerms.add(docTermsEnum.term());
-        }
-      } while (chunk >= buffer.length);
+      docTermOrds.setDocument(doc);
+      long ord;
+      while ((ord = docTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+        docTermOrds.lookupOrd(ord, scratch);
+        collectorTerms.add(scratch);
+      }
     }
 
     @Override
     public void setNextReader(AtomicReaderContext context) throws IOException {
       docTermOrds = FieldCache.DEFAULT.getDocTermOrds(context.reader(), field);
-      docTermsEnum = docTermOrds.getOrdTermsEnum(context.reader());
-      reuse = null; // LUCENE-3377 needs to be fixed first then this statement can be removed...
     }
   }
 
diff --git a/lucene/join/src/java/org/apache/lucene/search/join/TermsWithScoreCollector.java b/lucene/join/src/java/org/apache/lucene/search/join/TermsWithScoreCollector.java
index 505b4f0..7be115b 100644
--- a/lucene/join/src/java/org/apache/lucene/search/join/TermsWithScoreCollector.java
+++ b/lucene/join/src/java/org/apache/lucene/search/join/TermsWithScoreCollector.java
@@ -21,8 +21,7 @@
 
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.BinaryDocValues;
-import org.apache.lucene.index.DocTermOrds;
-import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.search.Scorer;
@@ -181,9 +180,8 @@
   // impl that works with multiple values per document
   static class MV extends TermsWithScoreCollector {
 
-    DocTermOrds fromDocTermOrds;
-    TermsEnum docTermsEnum;
-    DocTermOrds.TermOrdsIterator reuse;
+    SortedSetDocValues fromDocTermOrds;
+    final BytesRef scratch = new BytesRef();
 
     MV(String field, ScoreMode scoreMode) {
       super(field, scoreMode);
@@ -191,52 +189,33 @@
 
     @Override
     public void collect(int doc) throws IOException {
-      reuse = fromDocTermOrds.lookup(doc, reuse);
-      int[] buffer = new int[5];
-
-      int chunk;
-      do {
-        chunk = reuse.read(buffer);
-        if (chunk == 0) {
-          return;
-        }
-
-        for (int idx = 0; idx < chunk; idx++) {
-          int key = buffer[idx];
-          docTermsEnum.seekExact((long) key);
-          int ord = collectedTerms.add(docTermsEnum.term());
-          if (ord < 0) {
-            ord = -ord - 1;
-          } else {
-            if (ord >= scoreSums.length) {
-              scoreSums = ArrayUtil.grow(scoreSums);
-            }
-          }
-
-          final float current = scorer.score();
-          final float existing = scoreSums[ord];
-          if (Float.compare(existing, 0.0f) == 0) {
-            scoreSums[ord] = current;
-          } else {
-            switch (scoreMode) {
-              case Total:
-                scoreSums[ord] = existing + current;
-                break;
-              case Max:
-                if (current > existing) {
-                  scoreSums[ord] = current;
-                }
-            }
+      fromDocTermOrds.setDocument(doc);
+      long ord;
+      while ((ord = fromDocTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+        fromDocTermOrds.lookupOrd(ord, scratch);
+        
+        int termID = collectedTerms.add(scratch);
+        if (termID < 0) {
+          termID = -termID - 1;
+        } else {
+          if (termID >= scoreSums.length) {
+            scoreSums = ArrayUtil.grow(scoreSums);
           }
         }
-      } while (chunk >= buffer.length);
+        
+        switch (scoreMode) {
+          case Total:
+            scoreSums[termID] += scorer.score();
+            break;
+          case Max:
+            scoreSums[termID] = Math.max(scoreSums[termID], scorer.score());
+        }
+      }
     }
 
     @Override
     public void setNextReader(AtomicReaderContext context) throws IOException {
       fromDocTermOrds = FieldCache.DEFAULT.getDocTermOrds(context.reader(), field);
-      docTermsEnum = fromDocTermOrds.getOrdTermsEnum(context.reader());
-      reuse = null; // LUCENE-3377 needs to be fixed first then this statement can be removed...
     }
 
     static class Avg extends MV {
@@ -249,40 +228,23 @@
 
       @Override
       public void collect(int doc) throws IOException {
-        reuse = fromDocTermOrds.lookup(doc, reuse);
-        int[] buffer = new int[5];
-
-        int chunk;
-        do {
-          chunk = reuse.read(buffer);
-          if (chunk == 0) {
-            return;
-          }
-
-          for (int idx = 0; idx < chunk; idx++) {
-            int key = buffer[idx];
-            docTermsEnum.seekExact((long) key);
-            int ord = collectedTerms.add(docTermsEnum.term());
-            if (ord < 0) {
-              ord = -ord - 1;
-            } else {
-              if (ord >= scoreSums.length) {
-                scoreSums = ArrayUtil.grow(scoreSums);
-                scoreCounts = ArrayUtil.grow(scoreCounts);
-              }
-            }
-
-            float current = scorer.score();
-            float existing = scoreSums[ord];
-            if (Float.compare(existing, 0.0f) == 0) {
-              scoreSums[ord] = current;
-              scoreCounts[ord] = 1;
-            } else {
-              scoreSums[ord] = scoreSums[ord] + current;
-              scoreCounts[ord]++;
+        fromDocTermOrds.setDocument(doc);
+        long ord;
+        while ((ord = fromDocTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          fromDocTermOrds.lookupOrd(ord, scratch);
+          
+          int termID = collectedTerms.add(scratch);
+          if (termID < 0) {
+            termID = -termID - 1;
+          } else {
+            if (termID >= scoreSums.length) {
+              scoreSums = ArrayUtil.grow(scoreSums);
             }
           }
-        } while (chunk >= buffer.length);
+          
+          scoreSums[termID] += scorer.score();
+          scoreCounts[termID]++;
+        }
       }
 
       @Override
diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java b/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java
index a80fc24..8083cde 100644
--- a/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java
+++ b/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java
@@ -35,6 +35,7 @@
 import org.apache.lucene.index.RandomIndexWriter;
 import org.apache.lucene.index.ReaderUtil;
 import org.apache.lucene.index.SlowCompositeReaderWrapper;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
@@ -466,48 +467,26 @@
         fromSearcher.search(new TermQuery(new Term("value", uniqueRandomValue)), new Collector() {
 
           private Scorer scorer;
-          private DocTermOrds docTermOrds;
-          private TermsEnum docTermsEnum;
-          private DocTermOrds.TermOrdsIterator reuse;
+          private SortedSetDocValues docTermOrds;
+          final BytesRef joinValue = new BytesRef();
 
           @Override
           public void collect(int doc) throws IOException {
-            if (docTermOrds.isEmpty()) {
-              return;
+            docTermOrds.setDocument(doc);
+            long ord;
+            while ((ord = docTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+              docTermOrds.lookupOrd(ord, joinValue);
+              JoinScore joinScore = joinValueToJoinScores.get(joinValue);
+              if (joinScore == null) {
+                joinValueToJoinScores.put(BytesRef.deepCopyOf(joinValue), joinScore = new JoinScore());
+              }
+              joinScore.addScore(scorer.score());
             }
-
-            reuse = docTermOrds.lookup(doc, reuse);
-            int[] buffer = new int[5];
-
-            int chunk;
-            do {
-              chunk = reuse.read(buffer);
-              if (chunk == 0) {
-                return;
-              }
-
-              for (int idx = 0; idx < chunk; idx++) {
-                int key = buffer[idx];
-                docTermsEnum.seekExact((long) key);
-                BytesRef joinValue = docTermsEnum.term();
-                if (joinValue == null) {
-                  continue;
-                }
-
-                JoinScore joinScore = joinValueToJoinScores.get(joinValue);
-                if (joinScore == null) {
-                  joinValueToJoinScores.put(BytesRef.deepCopyOf(joinValue), joinScore = new JoinScore());
-                }
-                joinScore.addScore(scorer.score());
-              }
-            } while (chunk >= buffer.length);
           }
 
           @Override
           public void setNextReader(AtomicReaderContext context) throws IOException {
             docTermOrds = FieldCache.DEFAULT.getDocTermOrds(context.reader(), fromField);
-            docTermsEnum = docTermOrds.getOrdTermsEnum(context.reader());
-            reuse = null;
           }
 
           @Override
@@ -588,50 +567,33 @@
         } else {
           toSearcher.search(new MatchAllDocsQuery(), new Collector() {
 
-            private DocTermOrds docTermOrds;
-            private TermsEnum docTermsEnum;
-            private DocTermOrds.TermOrdsIterator reuse;
+            private SortedSetDocValues docTermOrds;
+            private final BytesRef scratch = new BytesRef();
             private int docBase;
 
             @Override
             public void collect(int doc) throws IOException {
-              if (docTermOrds.isEmpty()) {
-                return;
+              docTermOrds.setDocument(doc);
+              long ord;
+              while ((ord = docTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+                docTermOrds.lookupOrd(ord, scratch);
+                JoinScore joinScore = joinValueToJoinScores.get(scratch);
+                if (joinScore == null) {
+                  continue;
+                }
+                Integer basedDoc = docBase + doc;
+                // First encountered join value determines the score.
+                // Something to keep in mind for many-to-many relations.
+                if (!docToJoinScore.containsKey(basedDoc)) {
+                  docToJoinScore.put(basedDoc, joinScore);
+                }
               }
-
-              reuse = docTermOrds.lookup(doc, reuse);
-              int[] buffer = new int[5];
-
-              int chunk;
-              do {
-                chunk = reuse.read(buffer);
-                if (chunk == 0) {
-                  return;
-                }
-
-                for (int idx = 0; idx < chunk; idx++) {
-                  int key = buffer[idx];
-                  docTermsEnum.seekExact((long) key);
-                  JoinScore joinScore = joinValueToJoinScores.get(docTermsEnum.term());
-                  if (joinScore == null) {
-                    continue;
-                  }
-                  Integer basedDoc = docBase + doc;
-                  // First encountered join value determines the score.
-                  // Something to keep in mind for many-to-many relations.
-                  if (!docToJoinScore.containsKey(basedDoc)) {
-                    docToJoinScore.put(basedDoc, joinScore);
-                  }
-                }
-              } while (chunk >= buffer.length);
             }
 
             @Override
             public void setNextReader(AtomicReaderContext context) throws IOException {
               docBase = context.docBase;
               docTermOrds = FieldCache.DEFAULT.getDocTermOrds(context.reader(), toField);
-              docTermsEnum = docTermOrds.getOrdTermsEnum(context.reader());
-              reuse = null;
             }
 
             @Override
diff --git a/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java b/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
index 623ddf7..edec042 100644
--- a/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
+++ b/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
@@ -46,6 +46,7 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.OrdTermState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.index.TermState;
 import org.apache.lucene.index.Terms;
@@ -752,6 +753,11 @@
     public SortedDocValues getSortedDocValues(String field) {
       return null;
     }
+    
+    @Override
+    public SortedSetDocValues getSortedSetDocValues(String field) {
+      return null;
+    }
 
     private class MemoryFields extends Fields {
       @Override
diff --git a/lucene/queries/src/test/org/apache/lucene/queries/function/TestDocValuesFieldSources.java b/lucene/queries/src/test/org/apache/lucene/queries/function/TestDocValuesFieldSources.java
index 2abf2c3..3d2a1a2 100644
--- a/lucene/queries/src/test/org/apache/lucene/queries/function/TestDocValuesFieldSources.java
+++ b/lucene/queries/src/test/org/apache/lucene/queries/function/TestDocValuesFieldSources.java
@@ -145,7 +145,9 @@
 
   public void test() throws IOException {
     for (DocValuesType type : DocValuesType.values()) {
-      test(type);
+      if (type != DocValuesType.SORTED_SET) {
+        test(type);
+      }
     }
   }
 
diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/asserting/AssertingDocValuesFormat.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/asserting/AssertingDocValuesFormat.java
index f9e39be..998bea7 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/codecs/asserting/AssertingDocValuesFormat.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/asserting/AssertingDocValuesFormat.java
@@ -32,8 +32,10 @@
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SegmentWriteState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.OpenBitSet;
 
 /**
  * Just like {@link Lucene42DocValuesFormat} but with additional asserts.
@@ -127,8 +129,54 @@
       in.addSortedField(field, values, docToOrd);
     }
     
-    private <T> void checkIterator(Iterator<T> iterator, int expectedSize) {
-      for (int i = 0; i < expectedSize; i++) {
+    @Override
+    public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+      long valueCount = 0;
+      BytesRef lastValue = null;
+      for (BytesRef b : values) {
+        assert b != null;
+        assert b.isValid();
+        if (valueCount > 0) {
+          assert b.compareTo(lastValue) > 0;
+        }
+        lastValue = BytesRef.deepCopyOf(b);
+        valueCount++;
+      }
+      
+      int docCount = 0;
+      long ordCount = 0;
+      OpenBitSet seenOrds = new OpenBitSet(valueCount);
+      Iterator<Number> ordIterator = ords.iterator();
+      for (Number v : docToOrdCount) {
+        assert v != null;
+        int count = v.intValue();
+        assert count >= 0;
+        docCount++;
+        ordCount += count;
+        
+        long lastOrd = -1;
+        for (int i = 0; i < count; i++) {
+          Number o = ordIterator.next();
+          assert o != null;
+          long ord = o.longValue();
+          assert ord >= 0 && ord < valueCount;
+          assert ord > lastOrd : "ord=" + ord + ",lastOrd=" + lastOrd;
+          seenOrds.set(ord);
+          lastOrd = ord;
+        }
+      }
+      assert ordIterator.hasNext() == false;
+      
+      assert docCount == maxDoc;
+      assert seenOrds.cardinality() == valueCount;
+      checkIterator(values.iterator(), valueCount);
+      checkIterator(docToOrdCount.iterator(), maxDoc);
+      checkIterator(ords.iterator(), ordCount);
+      in.addSortedSetField(field, values, docToOrdCount, ords);
+    }
+
+    private <T> void checkIterator(Iterator<T> iterator, long expectedSize) {
+      for (long i = 0; i < expectedSize; i++) {
         boolean hasNext = iterator.hasNext();
         assert hasNext;
         T v = iterator.next();
@@ -190,6 +238,14 @@
     }
     
     @Override
+    public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+      assert field.getDocValuesType() == FieldInfo.DocValuesType.SORTED_SET;
+      SortedSetDocValues values = in.getSortedSet(field);
+      assert values != null;
+      return new AssertingAtomicReader.AssertingSortedSetDocValues(values, maxDoc);
+    }
+
+    @Override
     public void close() throws IOException {
       in.close();
     }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/cheapbastard/CheapBastardDocValuesProducer.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/cheapbastard/CheapBastardDocValuesProducer.java
index bd502a4..a38def6 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/codecs/cheapbastard/CheapBastardDocValuesProducer.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/cheapbastard/CheapBastardDocValuesProducer.java
@@ -31,6 +31,7 @@
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SegmentReadState;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
@@ -40,6 +41,7 @@
 class CheapBastardDocValuesProducer extends DocValuesProducer {
   private final Map<Integer,NumericEntry> numerics;
   private final Map<Integer,NumericEntry> ords;
+  private final Map<Integer,NumericEntry> ordIndexes;
   private final Map<Integer,BinaryEntry> binaries;
   private final IndexInput data;
   
@@ -54,6 +56,7 @@
                                 DiskDocValuesFormat.VERSION_START);
       numerics = new HashMap<Integer,NumericEntry>();
       ords = new HashMap<Integer,NumericEntry>();
+      ordIndexes = new HashMap<Integer,NumericEntry>();
       binaries = new HashMap<Integer,BinaryEntry>();
       readFields(in);
       success = true;
@@ -100,6 +103,36 @@
         }
         NumericEntry n = readNumericEntry(meta);
         ords.put(fieldNumber, n);
+      } else if (type == DiskDocValuesFormat.SORTED_SET) {
+        // sortedset = binary + numeric + ordIndex
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.BINARY) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        BinaryEntry b = readBinaryEntry(meta);
+        binaries.put(fieldNumber, b);
+        
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.NUMERIC) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        NumericEntry n1 = readNumericEntry(meta);
+        ords.put(fieldNumber, n1);
+        
+        if (meta.readVInt() != fieldNumber) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        if (meta.readByte() != DiskDocValuesFormat.NUMERIC) {
+          throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
+        }
+        NumericEntry n2 = readNumericEntry(meta);
+        ordIndexes.put(fieldNumber, n2);
+      } else {
+        throw new CorruptIndexException("invalid type: " + type + ", resource=" + meta);
       }
       fieldNumber = meta.readVInt();
     }
@@ -109,7 +142,7 @@
     NumericEntry entry = new NumericEntry();
     entry.packedIntsVersion = meta.readVInt();
     entry.offset = meta.readLong();
-    entry.count = meta.readVInt();
+    entry.count = meta.readVLong();
     entry.blockSize = meta.readVInt();
     return entry;
   }
@@ -118,7 +151,7 @@
     BinaryEntry entry = new BinaryEntry();
     entry.minLength = meta.readVInt();
     entry.maxLength = meta.readVInt();
-    entry.count = meta.readVInt();
+    entry.count = meta.readVLong();
     entry.offset = meta.readLong();
     if (entry.minLength != entry.maxLength) {
       entry.addressesOffset = meta.readLong();
@@ -134,15 +167,15 @@
     return getNumeric(field, entry);
   }
   
-  private NumericDocValues getNumeric(FieldInfo field, final NumericEntry entry) throws IOException {
+  private LongNumericDocValues getNumeric(FieldInfo field, final NumericEntry entry) throws IOException {
     final IndexInput data = this.data.clone();
     data.seek(entry.offset);
 
     final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, true);
-    return new NumericDocValues() {
+    return new LongNumericDocValues() {
       @Override
-      public long get(int docID) {
-        return reader.get(docID);
+      public long get(long id) {
+        return reader.get(id);
       }
     };
   }
@@ -160,10 +193,10 @@
   private BinaryDocValues getFixedBinary(FieldInfo field, final BinaryEntry bytes) {
     final IndexInput data = this.data.clone();
 
-    return new BinaryDocValues() {
+    return new LongBinaryDocValues() {
       @Override
-      public void get(int docID, BytesRef result) {
-        long address = bytes.offset + docID * (long)bytes.maxLength;
+      public void get(long id, BytesRef result) {
+        long address = bytes.offset + id * bytes.maxLength;
         try {
           data.seek(address);
           // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource) 
@@ -185,11 +218,11 @@
     data.seek(bytes.addressesOffset);
 
     final MonotonicBlockPackedReader addresses = new MonotonicBlockPackedReader(data, bytes.packedIntsVersion, bytes.blockSize, bytes.count, true);
-    return new BinaryDocValues() {
+    return new LongBinaryDocValues() {
       @Override
-      public void get(int docID, BytesRef result) {
-        long startAddress = bytes.offset + (docID == 0 ? 0 : + addresses.get(docID-1));
-        long endAddress = bytes.offset + addresses.get(docID);
+      public void get(long id, BytesRef result) {
+        long startAddress = bytes.offset + (id == 0 ? 0 : + addresses.get(id-1));
+        long endAddress = bytes.offset + addresses.get(id);
         int length = (int) (endAddress - startAddress);
         try {
           data.seek(startAddress);
@@ -209,7 +242,7 @@
 
   @Override
   public SortedDocValues getSorted(FieldInfo field) throws IOException {
-    final int valueCount = binaries.get(field.number).count;
+    final int valueCount = (int) binaries.get(field.number).count;
     final BinaryDocValues binary = getBinary(field);
     final NumericDocValues ordinals = getNumeric(field, ords.get(field.number));
     return new SortedDocValues() {
@@ -232,6 +265,49 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+    final long valueCount = binaries.get(field.number).count;
+    final LongBinaryDocValues binary = (LongBinaryDocValues) getBinary(field);
+    final LongNumericDocValues ordinals = getNumeric(field, ords.get(field.number));
+    NumericEntry entry = ordIndexes.get(field.number);
+    IndexInput data = this.data.clone();
+    data.seek(entry.offset);
+    final MonotonicBlockPackedReader ordIndex = new MonotonicBlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, true);
+    
+    return new SortedSetDocValues() {
+      long offset;
+      long endOffset;
+      
+      @Override
+      public long nextOrd() {
+        if (offset == endOffset) {
+          return NO_MORE_ORDS;
+        } else {
+          long ord = ordinals.get(offset);
+          offset++;
+          return ord;
+        }
+      }
+
+      @Override
+      public void setDocument(int docID) {
+        offset = (docID == 0 ? 0 : ordIndex.get(docID-1));
+        endOffset = ordIndex.get(docID);
+      }
+
+      @Override
+      public void lookupOrd(long ord, BytesRef result) {
+        binary.get(ord, result);
+      }
+
+      @Override
+      public long getValueCount() {
+        return valueCount;
+      }
+    };
+  }
+
+  @Override
   public void close() throws IOException {
     data.close();
   }
@@ -240,18 +316,37 @@
     long offset;
 
     int packedIntsVersion;
-    int count;
+    long count;
     int blockSize;
   }
   
   static class BinaryEntry {
     long offset;
 
-    int count;
+    long count;
     int minLength;
     int maxLength;
     long addressesOffset;
     int packedIntsVersion;
     int blockSize;
   }
+  
+  // internally we compose complex dv (sorted/sortedset) from other ones
+  static abstract class LongNumericDocValues extends NumericDocValues {
+    @Override
+    public final long get(int docID) {
+      return get((long) docID);
+    }
+    
+    abstract long get(long id);
+  }
+  
+  static abstract class LongBinaryDocValues extends BinaryDocValues {
+    @Override
+    public final void get(int docID, BytesRef result) {
+      get((long)docID, result);
+    }
+    
+    abstract void get(long id, BytesRef Result);
+  }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesWriter.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesWriter.java
index 8f95546..2769abd 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesWriter.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/lucene40/Lucene40DocValuesWriter.java
@@ -494,6 +494,11 @@
   }
   
   @Override
+  public void addSortedSetField(FieldInfo field, Iterable<BytesRef> values, Iterable<Number> docToOrdCount, Iterable<Number> ords) throws IOException {
+    throw new UnsupportedOperationException("Lucene 4.0 does not support SortedSet docvalues");
+  }
+
+  @Override
   public void close() throws IOException {
     dir.close();
   }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/AssertingAtomicReader.java b/lucene/test-framework/src/java/org/apache/lucene/index/AssertingAtomicReader.java
index e5d7bf0..7bcd9f6 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/AssertingAtomicReader.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/AssertingAtomicReader.java
@@ -453,6 +453,62 @@
       return result;
     }
   }
+  
+  /** Wraps a SortedSetDocValues but with additional asserts */
+  public static class AssertingSortedSetDocValues extends SortedSetDocValues {
+    private final SortedSetDocValues in;
+    private final int maxDoc;
+    private final long valueCount;
+    long lastOrd = NO_MORE_ORDS;
+    
+    public AssertingSortedSetDocValues(SortedSetDocValues in, int maxDoc) {
+      this.in = in;
+      this.maxDoc = maxDoc;
+      this.valueCount = in.getValueCount();
+      assert valueCount >= 0;
+    }
+    
+    @Override
+    public long nextOrd() {
+      assert lastOrd != NO_MORE_ORDS;
+      long ord = in.nextOrd();
+      assert ord < valueCount;
+      assert ord == NO_MORE_ORDS || ord > lastOrd;
+      lastOrd = ord;
+      return ord;
+    }
+
+    @Override
+    public void setDocument(int docID) {
+      assert docID >= 0 && docID < maxDoc : "docid=" + docID + ",maxDoc=" + maxDoc;
+      in.setDocument(docID);
+      lastOrd = -2;
+    }
+
+    @Override
+    public void lookupOrd(long ord, BytesRef result) {
+      assert ord >= 0 && ord < valueCount;
+      assert result.isValid();
+      in.lookupOrd(ord, result);
+      assert result.isValid();
+    }
+
+    @Override
+    public long getValueCount() {
+      long valueCount = in.getValueCount();
+      assert valueCount == this.valueCount; // should not change
+      return valueCount;
+    }
+
+    @Override
+    public long lookupTerm(BytesRef key) {
+      assert key.isValid();
+      long result = in.lookupTerm(key);
+      assert result < valueCount;
+      assert key.isValid();
+      return result;
+    }
+  }
 
   @Override
   public NumericDocValues getNumericDocValues(String field) throws IOException {
@@ -497,6 +553,20 @@
   }
 
   @Override
+  public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+    SortedSetDocValues dv = super.getSortedSetDocValues(field);
+    FieldInfo fi = getFieldInfos().fieldInfo(field);
+    if (dv != null) {
+      assert fi != null;
+      assert fi.getDocValuesType() == FieldInfo.DocValuesType.SORTED_SET;
+      return new AssertingSortedSetDocValues(dv, maxDoc());
+    } else {
+      assert fi == null || fi.getDocValuesType() != FieldInfo.DocValuesType.SORTED_SET;
+      return null;
+    }
+  }
+
+  @Override
   public NumericDocValues getNormValues(String field) throws IOException {
     NumericDocValues dv = super.getNormValues(field);
     FieldInfo fi = getFieldInfos().fieldInfo(field);
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseDocValuesFormatTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseDocValuesFormatTestCase.java
index 9ff1795..d9ee91a 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseDocValuesFormatTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseDocValuesFormatTestCase.java
@@ -17,10 +17,16 @@
  * limitations under the License.
  */
 
+import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -34,12 +40,14 @@
 import org.apache.lucene.document.FloatDocValuesField;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.FieldInfo.DocValuesType;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.FieldCache;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreDoc;
@@ -1266,4 +1274,602 @@
       doTestSortedVsStoredFields(1, 10);
     }
   }
+  
+  public void testSortedSetOneValue() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoFields() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    doc.add(new SortedSetDocValuesField("field2", new BytesRef("world")));
+    iwriter.addDocument(doc);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field2");
+
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("world"), bytes);
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoDocumentsMerged() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+  
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    iwriter.commit();
+    
+    doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("world")));
+    iwriter.addDocument(doc);
+    iwriter.forceMerge(1);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(2, dv.getValueCount());
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    dv.setDocument(1);
+    assertEquals(1, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    dv.lookupOrd(1, bytes);
+    assertEquals(new BytesRef("world"), bytes);   
+
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoValues() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("world")));
+    iwriter.addDocument(doc);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(1, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    dv.lookupOrd(1, bytes);
+    assertEquals(new BytesRef("world"), bytes);
+
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoValuesUnordered() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("world")));
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(1, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    dv.lookupOrd(1, bytes);
+    assertEquals(new BytesRef("world"), bytes);
+
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetThreeValuesTwoDocs() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("world")));
+    iwriter.addDocument(doc);
+    iwriter.commit();
+    
+    doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("beer")));
+    iwriter.addDocument(doc);
+    iwriter.forceMerge(1);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(3, dv.getValueCount());
+    
+    dv.setDocument(0);
+    assertEquals(1, dv.nextOrd());
+    assertEquals(2, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    dv.setDocument(1);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(1, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("beer"), bytes);
+    
+    dv.lookupOrd(1, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    dv.lookupOrd(2, bytes);
+    assertEquals(new BytesRef("world"), bytes);
+
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoDocumentsLastMissing() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    
+    doc = new Document();
+    iwriter.addDocument(doc);
+    iwriter.forceMerge(1);
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(1, dv.getValueCount());
+    
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoDocumentsLastMissingMerge() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    iwriter.commit();
+    
+    doc = new Document();
+    iwriter.addDocument(doc);
+    iwriter.forceMerge(1);
+   
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(1, dv.getValueCount());
+
+    dv.setDocument(0);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoDocumentsFirstMissing() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    iwriter.addDocument(doc);
+    
+    doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    
+    iwriter.forceMerge(1);
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(1, dv.getValueCount());
+
+    dv.setDocument(1);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetTwoDocumentsFirstMissingMerge() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    iwriter.addDocument(doc);
+    iwriter.commit();
+    
+    doc = new Document();
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    iwriter.forceMerge(1);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(1, dv.getValueCount());
+
+    dv.setDocument(1);
+    assertEquals(0, dv.nextOrd());
+    assertEquals(NO_MORE_ORDS, dv.nextOrd());
+    
+    BytesRef bytes = new BytesRef();
+    dv.lookupOrd(0, bytes);
+    assertEquals(new BytesRef("hello"), bytes);
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  public void testSortedSetMergeAwayAllValues() throws IOException {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    Directory directory = newDirectory();
+    Analyzer analyzer = new MockAnalyzer(random());
+    IndexWriterConfig iwconfig = newIndexWriterConfig(TEST_VERSION_CURRENT, analyzer);
+    iwconfig.setMergePolicy(newLogMergePolicy());
+    RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory, iwconfig);
+    
+    Document doc = new Document();
+    doc.add(new StringField("id", "0", Field.Store.NO));
+    iwriter.addDocument(doc);    
+    doc = new Document();
+    doc.add(new StringField("id", "1", Field.Store.NO));
+    doc.add(new SortedSetDocValuesField("field", new BytesRef("hello")));
+    iwriter.addDocument(doc);
+    iwriter.commit();
+    iwriter.deleteDocuments(new Term("id", "1"));
+    iwriter.forceMerge(1);
+    
+    DirectoryReader ireader = iwriter.getReader();
+    iwriter.close();
+    
+    SortedSetDocValues dv = getOnlySegmentReader(ireader).getSortedSetDocValues("field");
+    assertEquals(0, dv.getValueCount());
+    
+    ireader.close();
+    directory.close();
+  }
+  
+  private void doTestSortedSetVsStoredFields(int minLength, int maxLength) throws Exception {
+    Directory dir = newDirectory();
+    IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+    RandomIndexWriter writer = new RandomIndexWriter(random(), dir, conf);
+    
+    // index some docs
+    int numDocs = atLeast(1000);
+    for (int i = 0; i < numDocs; i++) {
+      Document doc = new Document();
+      Field idField = new StringField("id", Integer.toString(i), Field.Store.NO);
+      doc.add(idField);
+      final int length;
+      if (minLength == maxLength) {
+        length = minLength; // fixed length
+      } else {
+        length = _TestUtil.nextInt(random(), minLength, maxLength);
+      }
+      int numValues = random().nextInt(17);
+      // create a random set of strings
+      Set<String> values = new TreeSet<String>();
+      for (int v = 0; v < numValues; v++) {
+        values.add(_TestUtil.randomSimpleString(random(), length));
+      }
+      
+      // add ordered to the stored field
+      for (String v : values) {
+        doc.add(new StoredField("stored", v));
+      }
+
+      // add in any order to the dv field
+      ArrayList<String> unordered = new ArrayList<String>(values);
+      Collections.shuffle(unordered, random());
+      for (String v : unordered) {
+        doc.add(new SortedSetDocValuesField("dv", new BytesRef(v)));
+      }
+
+      writer.addDocument(doc);
+      if (random().nextInt(31) == 0) {
+        writer.commit();
+      }
+    }
+    
+    // delete some docs
+    int numDeletions = random().nextInt(numDocs/10);
+    for (int i = 0; i < numDeletions; i++) {
+      int id = random().nextInt(numDocs);
+      writer.deleteDocuments(new Term("id", Integer.toString(id)));
+    }
+    writer.close();
+    
+    // compare
+    DirectoryReader ir = DirectoryReader.open(dir);
+    for (AtomicReaderContext context : ir.leaves()) {
+      AtomicReader r = context.reader();
+      SortedSetDocValues docValues = r.getSortedSetDocValues("dv");
+      BytesRef scratch = new BytesRef();
+      for (int i = 0; i < r.maxDoc(); i++) {
+        String stringValues[] = r.document(i).getValues("stored");
+        if (docValues != null) {
+          docValues.setDocument(i);
+        }
+        for (int j = 0; j < stringValues.length; j++) {
+          assert docValues != null;
+          long ord = docValues.nextOrd();
+          assert ord != NO_MORE_ORDS;
+          docValues.lookupOrd(ord, scratch);
+          assertEquals(stringValues[j], scratch.utf8ToString());
+        }
+        assert docValues == null || docValues.nextOrd() == NO_MORE_ORDS;
+      }
+    }
+    ir.close();
+    dir.close();
+  }
+  
+  public void testSortedSetFixedLengthVsStoredFields() throws Exception {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    int numIterations = atLeast(1);
+    for (int i = 0; i < numIterations; i++) {
+      int fixedLength = _TestUtil.nextInt(random(), 1, 10);
+      doTestSortedSetVsStoredFields(fixedLength, fixedLength);
+    }
+  }
+  
+  public void testSortedSetVariableLengthVsStoredFields() throws Exception {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    int numIterations = atLeast(1);
+    for (int i = 0; i < numIterations; i++) {
+      doTestSortedSetVsStoredFields(1, 10);
+    }
+  }
+  
+  private void assertEquals(int maxDoc, SortedSetDocValues expected, SortedSetDocValues actual) throws Exception {
+    // can be null for the segment if no docs actually had any SortedDocValues
+    // in this case FC.getDocTermsOrds returns EMPTY
+    if (actual == null) {
+      assertEquals(SortedSetDocValues.EMPTY, expected);
+      return;
+    }
+    assertEquals(expected.getValueCount(), actual.getValueCount());
+    // compare ord lists
+    for (int i = 0; i < maxDoc; i++) {
+      expected.setDocument(i);
+      actual.setDocument(i);
+      long expectedOrd;
+      while ((expectedOrd = expected.nextOrd()) != NO_MORE_ORDS) {
+        assertEquals(expectedOrd, actual.nextOrd());
+      }
+      assertEquals(NO_MORE_ORDS, actual.nextOrd());
+    }
+    
+    // compare ord dictionary
+    BytesRef expectedBytes = new BytesRef();
+    BytesRef actualBytes = new BytesRef();
+    for (long i = 0; i < expected.getValueCount(); i++) {
+      expected.lookupTerm(expectedBytes);
+      actual.lookupTerm(actualBytes);
+      assertEquals(expectedBytes, actualBytes);
+    }
+  }
+  
+  private void doTestSortedSetVsUninvertedField(int minLength, int maxLength) throws Exception {
+    Directory dir = newDirectory();
+    IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+    RandomIndexWriter writer = new RandomIndexWriter(random(), dir, conf);
+    
+    // index some docs
+    int numDocs = atLeast(1000);
+    for (int i = 0; i < numDocs; i++) {
+      Document doc = new Document();
+      Field idField = new StringField("id", Integer.toString(i), Field.Store.NO);
+      doc.add(idField);
+      final int length;
+      if (minLength == maxLength) {
+        length = minLength; // fixed length
+      } else {
+        length = _TestUtil.nextInt(random(), minLength, maxLength);
+      }
+      int numValues = random().nextInt(17);
+      // create a random list of strings
+      List<String> values = new ArrayList<String>();
+      for (int v = 0; v < numValues; v++) {
+        values.add(_TestUtil.randomSimpleString(random(), length));
+      }
+      
+      // add in any order to the indexed field
+      ArrayList<String> unordered = new ArrayList<String>(values);
+      Collections.shuffle(unordered, random());
+      for (String v : values) {
+        doc.add(newStringField("indexed", v, Field.Store.NO));
+      }
+
+      // add in any order to the dv field
+      ArrayList<String> unordered2 = new ArrayList<String>(values);
+      Collections.shuffle(unordered2, random());
+      for (String v : unordered2) {
+        doc.add(new SortedSetDocValuesField("dv", new BytesRef(v)));
+      }
+
+      writer.addDocument(doc);
+      if (random().nextInt(31) == 0) {
+        writer.commit();
+      }
+    }
+    
+    // compare per-segment
+    // NOTE: we must do this before deleting, because FC.getDocTermsOrds/UninvertedField
+    // "bakes in" the deletes at the time it was first called.
+    DirectoryReader ir = writer.getReader();
+    for (AtomicReaderContext context : ir.leaves()) {
+      AtomicReader r = context.reader();
+      SortedSetDocValues expected = FieldCache.DEFAULT.getDocTermOrds(r, "indexed");
+      SortedSetDocValues actual = r.getSortedSetDocValues("dv");
+      assertEquals(r.maxDoc(), expected, actual);
+    }
+    ir.close();
+    
+    // delete some docs
+    int numDeletions = random().nextInt(numDocs/10);
+    for (int i = 0; i < numDeletions; i++) {
+      int id = random().nextInt(numDocs);
+      writer.deleteDocuments(new Term("id", Integer.toString(id)));
+    }
+    
+    writer.forceMerge(1);
+    
+    // now compare again after the merge
+    ir = writer.getReader();
+    AtomicReader ar = getOnlySegmentReader(ir);
+    SortedSetDocValues expected = FieldCache.DEFAULT.getDocTermOrds(ar, "indexed");
+    SortedSetDocValues actual = ar.getSortedSetDocValues("dv");
+    assertEquals(ir.maxDoc(), expected, actual);
+    ir.close();
+    
+    writer.close();
+    dir.close();
+  }
+  
+  public void testSortedSetFixedLengthVsUninvertedField() throws Exception {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    int numIterations = atLeast(1);
+    for (int i = 0; i < numIterations; i++) {
+      int fixedLength = _TestUtil.nextInt(random(), 1, 10);
+      doTestSortedSetVsUninvertedField(fixedLength, fixedLength);
+    }
+  }
+  
+  public void testSortedSetVariableLengthVsUninvertedField() throws Exception {
+    assumeTrue("Codec does not support SORTED_SET", defaultCodecSupportsSortedSet());
+    int numIterations = atLeast(1);
+    for (int i = 0; i < numIterations; i++) {
+      doTestSortedSetVsUninvertedField(1, 10);
+    }
+  }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomCodec.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomCodec.java
index ddaf32a..86258c0 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomCodec.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomCodec.java
@@ -154,8 +154,12 @@
     Collections.shuffle(dvFormats, random);
 
     // Avoid too many open files:
-    formats = formats.subList(0, 4);
-    dvFormats = dvFormats.subList(0, 4);
+    if (formats.size() > 4) {
+      formats = formats.subList(0, 4);
+    }
+    if (dvFormats.size() > 4) {
+      dvFormats = dvFormats.subList(0, 4);
+    }
   }
 
   public RandomCodec(Random random) {
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 4028fd3..1b674cf 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -26,6 +26,7 @@
 import java.util.logging.Logger;
 
 import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.FieldType;
@@ -1298,4 +1299,13 @@
       throw new IOException("Cannot find resource: " + name);
     }
   }
+  
+  /** Returns true if the default codec supports SORTED_SET docvalues */ 
+  public static boolean defaultCodecSupportsSortedSet() {
+    String name = Codec.getDefault().getName();
+    if (name.equals("Lucene40") || name.equals("Lucene41")) {
+      return false;
+    }
+    return true;
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/search/TestDocSet.java b/solr/core/src/test/org/apache/solr/search/TestDocSet.java
index c5769aa..4042463 100644
--- a/solr/core/src/test/org/apache/solr/search/TestDocSet.java
+++ b/solr/core/src/test/org/apache/solr/search/TestDocSet.java
@@ -33,6 +33,7 @@
 import org.apache.lucene.index.MultiReader;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.search.DocIdSet;
 import org.apache.lucene.search.DocIdSetIterator;
@@ -401,6 +402,11 @@
       public SortedDocValues getSortedDocValues(String field) {
         return null;
       }
+      
+      @Override
+      public SortedSetDocValues getSortedSetDocValues(String field) {
+        return null;
+      }
 
       @Override
       public NumericDocValues getNormValues(String field) {