compaction stress tool

patch by tjake; reviewed by Marcus Eriksson for CASSANDRA-11844
diff --git a/CHANGES.txt b/CHANGES.txt
index 760cc58..f759b7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Create compaction-stress tool (CASSANDRA-11844)
  * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
  * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
  * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
diff --git a/NEWS.txt b/NEWS.txt
index 85f2767..069c93b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,8 @@
 
 New features
 ------------
+   - A new compaction-stress tool has been added to test the throughput of compaction
+     for any cassandra-stress user schema.  see compaction-stress help for how to use.
    - Compaction can now take into account overlapping tables that don't take part
      in the compaction to look for deleted or overwritten data in the compacted tables.
      Then such data is found, it can be safely discarded, which in turn should enable
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 20dac1e..53f5305 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -381,17 +381,6 @@
         }
     }
 
-    private ColumnFamilyStore(Keyspace keyspace,
-                             String columnFamilyName,
-                             int generation,
-                             CFMetaData metadata,
-                             Directories directories,
-                             boolean loadSSTables)
-    {
-        this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true);
-    }
-
-
     @VisibleForTesting
     public ColumnFamilyStore(Keyspace keyspace,
                               String columnFamilyName,
@@ -399,7 +388,8 @@
                               CFMetaData metadata,
                               Directories directories,
                               boolean loadSSTables,
-                              boolean registerBookkeeping)
+                              boolean registerBookeeping,
+                              boolean offline)
     {
         assert directories != null;
         assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
@@ -428,8 +418,20 @@
             data.addInitialSSTables(sstables);
         }
 
+        /**
+         * When creating a CFS offline we change the default logic needed by CASSANDRA-8671
+         * and link the passed directories to be picked up by the compaction strategy
+         */
+        if (offline)
+            this.directories = directories;
+        else
+            this.directories = new Directories(metadata, Directories.dataDirectories);
+
+
         // compaction strategy should be created after the CFS has been prepared
         compactionStrategyManager = new CompactionStrategyManager(this);
+
+        // Since compaction can re-define data dir we need to reinit directories
         this.directories = compactionStrategyManager.getDirectories();
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
@@ -442,7 +444,7 @@
         for (IndexMetadata info : metadata.getIndexes())
             indexManager.addIndex(info);
 
-        if (registerBookkeeping)
+        if (registerBookeeping)
         {
             // register the mbean
             mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
@@ -496,10 +498,7 @@
 
     public Directories getDirectories()
     {
-        // todo, hack since we need to know the data directories when constructing the compaction strategy
-        if (directories != null)
-            return directories;
-        return new Directories(metadata, initialDirectories);
+        return directories;
     }
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
@@ -588,8 +587,20 @@
                                                                          CFMetaData metadata,
                                                                          boolean loadSSTables)
     {
-        // get the max generation number, to prevent generation conflicts
         Directories directories = new Directories(metadata, initialDirectories);
+        return createColumnFamilyStore(keyspace, columnFamily, metadata, directories, loadSSTables, true, false);
+    }
+
+    /** This is only directly used by offline tools */
+    public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace,
+                                                                         String columnFamily,
+                                                                         CFMetaData metadata,
+                                                                         Directories directories,
+                                                                         boolean loadSSTables,
+                                                                         boolean registerBookkeeping,
+                                                                         boolean offline)
+    {
+        // get the max generation number, to prevent generation conflicts
         Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -598,19 +609,19 @@
             generations.add(desc.generation);
             if (!desc.isCompatible())
                 throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.",
-                        desc.getFormat().getLatestVersion(), desc));
+                                                         desc.getFormat().getLatestVersion(), desc));
         }
         Collections.sort(generations);
         int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
 
-        return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables);
+        return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables, registerBookkeeping, offline);
     }
 
     /**
      * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
      */
-    public static void scrubDataDirectories(CFMetaData metadata) throws StartupException
+    public static void  scrubDataDirectories(CFMetaData metadata) throws StartupException
     {
         Directories directories = new Directories(metadata, initialDirectories);
         Set<File> cleanedDirectories = new HashSet<>();
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 62fd890..3533dbc 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -180,6 +180,12 @@
     {
         this(metadata, dataDirectories);
     }
+
+    public Directories(final CFMetaData metadata, Collection<DataDirectory> paths)
+    {
+        this(metadata, paths.toArray(new DataDirectory[paths.size()]));
+    }
+
     /**
      * Create Directories of given ColumnFamily.
      * SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time.
@@ -920,6 +926,19 @@
         return result;
     }
 
+    /**
+     * @return Raw size on disk for all directories
+     */
+    public long getRawDiretoriesSize()
+    {
+        long totalAllocatedSize = 0L;
+
+        for (File path : dataPaths)
+            totalAllocatedSize += FileUtils.folderSize(path);
+
+        return totalAllocatedSize;
+    }
+
     public long getTrueAllocatedSizeIn(File input)
     {
         if (!input.isDirectory())
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 37ed1e1..0d78245 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -371,6 +371,30 @@
     }
 
     /**
+     * Registers a custom cf instance with this keyspace.
+     * This is required for offline tools what use non-standard directories.
+     */
+    public void initCfCustom(ColumnFamilyStore newCfs)
+    {
+        ColumnFamilyStore cfs = columnFamilyStores.get(newCfs.metadata.cfId);
+
+        if (cfs == null)
+        {
+            // CFS being created for the first time, either on server startup or new CF being added.
+            // We don't worry about races here; startup is safe, and adding multiple idential CFs
+            // simultaneously is a "don't do that" scenario.
+            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(newCfs.metadata.cfId, newCfs);
+            // CFS mbean instantiation will error out before we hit this, but in case that changes...
+            if (oldCfs != null)
+                throw new IllegalStateException("added multiple mappings for cf id " + newCfs.metadata.cfId);
+        }
+        else
+        {
+            throw new IllegalStateException("CFS is already initialized: " + cfs.name);
+        }
+    }
+
+    /**
      * adds a cf to internal structures, ends up creating disk files).
      */
     public void initCf(CFMetaData metadata, boolean loadSSTables)
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 83592f0..6b5b8a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -126,7 +126,7 @@
             uncheckedTombstoneCompaction = DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION;
         }
 
-        directories = new Directories(cfs.metadata, Directories.dataDirectories);
+        directories = cfs.getDirectories();
     }
 
     public Directories getDirectories()
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 2311143..b1eadc5 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -30,6 +30,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -525,9 +526,14 @@
         log.untrackNew(table);
     }
 
-    public static boolean removeUnfinishedLeftovers(CFMetaData metadata)
+    public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs)
     {
-        return LogTransaction.removeUnfinishedLeftovers(metadata);
+        return LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
+    }
+
+    public static boolean removeUnfinishedLeftovers(CFMetaData cfMetaData)
+    {
+        return LogTransaction.removeUnfinishedLeftovers(cfMetaData);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index bbf6fd6..af8983d 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -41,6 +41,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 public class ByteOrderedPartitioner implements IPartitioner
 {
@@ -203,9 +204,13 @@
 
     public BytesToken getRandomToken()
     {
-        Random r = new Random();
+       return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public BytesToken getRandomToken(Random random)
+    {
         byte[] buffer = new byte[16];
-        r.nextBytes(buffer);
+        random.nextBytes(buffer);
         return new BytesToken(buffer);
     }
 
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index b559a6f..eb4aafb 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -72,6 +73,13 @@
      */
     public Token getRandomToken();
 
+    /**
+     * @param random instance of Random to use when generating the token
+     *
+     * @return a randomly generated token
+     */
+    public Token getRandomToken(Random random);
+
     public Token.TokenFactory getTokenFactory();
 
     /**
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index f9421c5..9922eb0 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -21,6 +21,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
@@ -64,6 +65,11 @@
         throw new UnsupportedOperationException();
     }
 
+    public LocalToken getRandomToken(Random random)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public Token.TokenFactory getTokenFactory()
     {
         return tokenFactory;
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 96b4ca0..ab552c4 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.DecoratedKey;
@@ -37,6 +38,8 @@
 
 public class OrderPreservingPartitioner implements IPartitioner
 {
+    private static final String rndchars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+
     public static final StringToken MINIMUM = new StringToken("");
 
     public static final BigInteger CHAR_MASK = new BigInteger("65535");
@@ -106,12 +109,14 @@
 
     public StringToken getRandomToken()
     {
-        String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
-        Random r = new Random();
+        return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public StringToken getRandomToken(Random random)
+    {
         StringBuilder buffer = new StringBuilder();
-        for (int j = 0; j < 16; j++) {
-            buffer.append(chars.charAt(r.nextInt(chars.length())));
-        }
+        for (int j = 0; j < 16; j++)
+            buffer.append(rndchars.charAt(random.nextInt(rndchars.length())));
         return new StringToken(buffer.toString());
     }
 
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 96a96ca..c2ec413 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -91,6 +91,14 @@
         return new BigIntegerToken(token);
     }
 
+    public BigIntegerToken getRandomToken(Random random)
+    {
+        BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random));
+        if ( token.signum() == -1 )
+            token = token.multiply(BigInteger.valueOf(-1L));
+        return new BigIntegerToken(token);
+    }
+
     private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {
         public ByteBuffer toByteArray(Token token)
         {
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 2ed37ee..bd157e9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -75,7 +75,7 @@
     protected final Configuration conf;
     protected final int maxFailures;
     protected final int bufferSize;
-    protected Closeable writer;
+    protected CQLSSTableWriter writer;
     protected SSTableLoader loader;
     protected Progressable progress;
     protected TaskAttemptContext context;
@@ -174,7 +174,7 @@
             ExternalClient externalClient = new ExternalClient(conf);
             externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
 
-            loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
+            loader = new SSTableLoader(writer.getInnermostDirectory(), externalClient, new NullOutputHandler())
             {
                 @Override
                 public void onSuccess(StreamState finalState)
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0213fd5..10b4caa 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,16 +22,17 @@
 import java.io.IOException;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -41,16 +42,17 @@
  */
 abstract class AbstractSSTableSimpleWriter implements Closeable
 {
-    protected final File directory;
-    protected final CFMetaData metadata;
+    protected final ColumnFamilyStore cfs;
+    protected final IPartitioner partitioner;
     protected final PartitionColumns columns;
     protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
     protected static AtomicInteger generation = new AtomicInteger(0);
+    protected boolean makeRangeAware = false;
 
-    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner,  PartitionColumns columns)
     {
-        this.metadata = metadata;
-        this.directory = directory;
+        this.cfs = cfs;
+        this.partitioner = partitioner;
         this.columns = columns;
     }
 
@@ -59,15 +61,25 @@
         this.formatType = type;
     }
 
+    protected void setRangeAwareWriting(boolean makeRangeAware)
+    {
+        this.makeRangeAware = makeRangeAware;
+    }
+
+
     protected SSTableTxnWriter createWriter()
     {
-        return SSTableTxnWriter.create(metadata,
-                                       createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+        SerializationHeader header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
+
+        if (makeRangeAware)
+            return SSTableTxnWriter.createRangeAware(cfs, 0,  ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+
+        return SSTableTxnWriter.create(cfs,
+                                       createDescriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata.ksName, cfs.metadata.cfName, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS),
-                                       Collections.emptySet());
+                                       header);
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
@@ -107,7 +119,7 @@
 
     PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
     {
-        return getUpdateFor(metadata.decorateKey(key));
+        return getUpdateFor(partitioner.decorateKey(key));
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 76c0e19..a6805df 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -21,16 +21,12 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
+import com.sun.org.apache.xpath.internal.operations.Bool;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -44,8 +40,7 @@
 import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,10 +49,12 @@
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -333,14 +330,30 @@
         return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
     }
     /**
+     * The writer loads data in directories corresponding to how they laid out on the server.
+     * <p>
+     * {keyspace}/{table-cfid}/
+     *
+     * This method can be used to fetch the innermost directory with the sstable components
+     * @return The directory containing the sstable components
+     */
+    public File getInnermostDirectory()
+    {
+        return writer.cfs.getDirectories().getDirectoryForNewSSTables();
+    }
+
+    /**
      * A Builder for a CQLSSTableWriter object.
      */
     public static class Builder
     {
-        private File directory;
+        private final List<File> directoryList;
+        private ColumnFamilyStore cfs;
 
         protected SSTableFormat.Type formatType = null;
 
+        private Boolean makeRangeAware = false;
+
         private CreateTableStatement.RawStatement schemaStatement;
         private final List<CreateTypeStatement> typeStatements;
         private UpdateStatement.ParsedInsert insertStatement;
@@ -349,8 +362,10 @@
         private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
-        protected Builder() {
+        protected Builder()
+        {
             this.typeStatements = new ArrayList<>();
+            this.directoryList = new ArrayList<>();
         }
 
         /**
@@ -373,7 +388,7 @@
          * <p>
          * This is a mandatory option.
          *
-         * @param directory the directory to use, which should exists and be writable.
+         * @param directory the directory to use, which should exist and be writable.
          * @return this builder.
          *
          * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
@@ -385,10 +400,29 @@
             if (!directory.canWrite())
                 throw new IllegalArgumentException(directory + " exists but is not writable");
 
-            this.directory = directory;
+            directoryList.add(directory);
             return this;
         }
 
+        /**
+         * A pre-instanciated ColumnFamilyStore
+         * <p>
+         * This is can be used in place of inDirectory and forTable
+         *
+         * @see #inDirectory(File)
+         *
+         * @param cfs the list of directories to use, which should exist and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if a directory doesn't exist or is not writable.
+         */
+        public Builder withCfs(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+            return this;
+        }
+
+
         public Builder withType(String typeDefinition) throws SyntaxException
         {
             typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE"));
@@ -431,6 +465,20 @@
             return this;
         }
 
+
+        /**
+         * Specify if the sstable writer should be vnode range aware.
+         * This will create a sstable per vnode range.
+         *
+         * @param makeRangeAware
+         * @return
+         */
+        public Builder rangeAware(boolean makeRangeAware)
+        {
+            this.makeRangeAware = makeRangeAware;
+            return this;
+        }
+
         /**
          * The INSERT statement defining the order of the values to add for a given CQL row.
          * <p>
@@ -499,36 +547,36 @@
         @SuppressWarnings("resource")
         public CQLSSTableWriter build()
         {
-            if (directory == null)
-                throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()");
-            if (schemaStatement == null)
+            if (directoryList.isEmpty() && cfs == null)
+                throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()");
+            if (schemaStatement == null && cfs == null)
                 throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
             if (insertStatement == null)
                 throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
 
             synchronized (CQLSSTableWriter.class)
             {
-                String keyspace = schemaStatement.keyspace();
+                if (cfs == null)
+                    cfs = createOfflineTable(schemaStatement, typeStatements, directoryList);
 
-                if (Schema.instance.getKSMetaData(keyspace) == null)
-                    Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+                if (partitioner == null)
+                    partitioner = cfs.getPartitioner();
 
-                createTypes(keyspace);
-                CFMetaData cfMetaData = createTable(keyspace);
                 Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert();
-
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
-                                                     : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                     ? new SSTableSimpleWriter(cfs, partitioner, preparedInsert.left.updatedColumns())
+                                                     : new SSTableSimpleUnsortedWriter(cfs, partitioner, preparedInsert.left.updatedColumns(), bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
 
+                writer.setRangeAwareWriting(makeRangeAware);
+
                 return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right);
             }
         }
 
-        private void createTypes(String keyspace)
+        private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements)
         {
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
@@ -538,31 +586,50 @@
             ksm = ksm.withSwapped(builder.build());
             Schema.instance.setKeyspaceMetadata(ksm);
         }
+
+        public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList)
+        {
+            return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList);
+        }
+
         /**
          * Creates the table according to schema statement
-         *
-         * @param keyspace name of the keyspace where table should be created
+         * with specified data directories
          */
-        private CFMetaData createTable(String keyspace)
+        public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList)
         {
+            String keyspace = schemaStatement.keyspace();
+
+            if (Schema.instance.getKSMetaData(keyspace) == null)
+                Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+
+            createTypes(keyspace, typeStatements);
+
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
 
             CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
-            if (cfMetaData == null)
-            {
-                CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
-                statement.validate(ClientState.forInternalCalls());
+            assert cfMetaData == null;
 
-                cfMetaData = statement.getCFMetaData();
+            CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
+            statement.validate(ClientState.forInternalCalls());
 
-                Schema.instance.load(cfMetaData);
-                Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
-            }
+            //Build metatdata with a portable cfId
+            cfMetaData = statement.metadataBuilder()
+                                  .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
+                                  .build()
+                                  .params(statement.params());
 
-            if (partitioner != null)
-                return cfMetaData.copy(partitioner);
-            else
-                return cfMetaData;
+            Keyspace.setInitialized();
+            Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+
+            Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
+            ColumnFamilyStore cfs =  ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true);
+
+            ks.initCfCustom(cfs);
+            Schema.instance.load(cfs.metadata);
+            Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
+
+            return cfs;
         }
 
         /**
@@ -587,7 +654,7 @@
         }
     }
 
-    private static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
+    public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
     {
         try
         {
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 043f6fa..15dd925 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,13 +56,25 @@
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
-        this(directory, client, outputHandler, 1);
+        this(directory, client, outputHandler, directory.getParentFile().getName(), 1);
     }
 
+
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
     {
+        this(directory, client, outputHandler, directory.getParentFile().getName(), connectionsPerHost);
+    }
+
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace)
+    {
+        this(directory, client, outputHandler, keyspace, 1);
+    }
+
+
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace, int connectionsPerHost)
+    {
         this.directory = directory;
-        this.keyspace = directory.getParentFile().getName();
+        this.keyspace = keyspace;
         this.client = client;
         this.outputHandler = outputHandler;
         this.connectionsPerHost = connectionsPerHost;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index fa88817..2563f26 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,7 @@
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -60,11 +62,11 @@
     private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
     {
-        super(directory, metadata, columns);
+        super(cfs, partitioner, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -110,7 +112,7 @@
 
     private PartitionUpdate createPartitionUpdate(DecoratedKey key)
     {
-        return new PartitionUpdate(metadata, key, columns, 4)
+        return new PartitionUpdate(cfs.metadata, key, columns, 4)
         {
             @Override
             public void add(Row row)
@@ -204,7 +206,7 @@
                     if (b == SENTINEL)
                         return;
 
-                        try (SSTableTxnWriter writer = createWriter())
+                    try (SSTableTxnWriter writer = createWriter())
                     {
                         for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
                             writer.append(entry.getValue().unfilteredIterator());
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 7fbd79d..2f6dd33 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,12 +19,14 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -43,9 +45,9 @@
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
+    protected SSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns)
     {
-        super(directory, metadata, columns);
+        super(cfs, partitioner, columns);
     }
 
     private SSTableTxnWriter getOrCreateWriter()
@@ -67,7 +69,7 @@
             if (update != null)
                 writePartition(update);
             currentKey = key;
-            update = new PartitionUpdate(metadata, currentKey, columns, 4);
+            update = new PartitionUpdate(cfs.metadata, currentKey, columns, 4);
         }
 
         assert update != null;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 09f6a55..5ffde15 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.io.sstable;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -27,6 +28,8 @@
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -101,6 +104,25 @@
         return new SSTableTxnWriter(txn, writer);
     }
 
+
+    public static SSTableTxnWriter createRangeAware(ColumnFamilyStore cfs, long keyCount, long repairedAt, SSTableFormat.Type type, int sstableLevel, SerializationHeader header)
+    {
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
+        SSTableMultiWriter writer;
+        try
+        {
+            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, type, sstableLevel, 0, txn, header);
+        }
+        catch (IOException e)
+        {
+            //We don't know the total size so this should never happen
+            //as we send in 0
+            throw new RuntimeException(e);
+        }
+
+        return new SSTableTxnWriter(txn, writer);
+    }
+
     @SuppressWarnings("resource") // log and writer closed during postCleanup
     public static SSTableTxnWriter create(CFMetaData cfm,
                                           Descriptor descriptor,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index 69fcbab..411dc23 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -67,12 +67,15 @@
             }
         }
         System.out.println();
+        reportCompactionTable(cm.getCompactions(), probe.getCompactionThroughput(), humanReadable);
+    }
+
+    public static void reportCompactionTable(List<Map<String,String>> compactions, int compactionThroughput, boolean humanReadable)
+    {
         long remainingBytes = 0;
         TableBuilder table = new TableBuilder();
-        List<Map<String, String>> compactions = cm.getCompactions();
         if (!compactions.isEmpty())
         {
-            int compactionThroughput = probe.getCompactionThroughput();
             table.add("id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
             for (Map<String, String> c : compactions)
             {
@@ -101,4 +104,5 @@
             System.out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
         }
     }
+
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/GuidGenerator.java b/src/java/org/apache/cassandra/utils/GuidGenerator.java
index 33f7330..1e523ea 100644
--- a/src/java/org/apache/cassandra/utils/GuidGenerator.java
+++ b/src/java/org/apache/cassandra/utils/GuidGenerator.java
@@ -69,12 +69,12 @@
         return convertToStandardFormat( sb.toString() );
     }
 
-    public static ByteBuffer guidAsBytes()
+    public static ByteBuffer guidAsBytes(Random random)
     {
         StringBuilder sbValueBeforeMD5 = new StringBuilder();
         long time = System.currentTimeMillis();
         long rand = 0;
-        rand = myRand.nextLong();
+        rand = random.nextLong();
         sbValueBeforeMD5.append(s_id)
                         .append(":")
                         .append(Long.toString(time))
@@ -85,6 +85,11 @@
         return ByteBuffer.wrap(FBUtilities.threadLocalMD5Digest().digest(valueBeforeMD5.getBytes()));
     }
 
+    public static ByteBuffer guidAsBytes()
+    {
+        return guidAsBytes(myRand);
+    }
+
     /*
         * Convert to the standard format for GUID
         * Example: C2FEEEAC-CFCD-11D1-8B05-00600806D9B6
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 7e53ba2..300be11 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -95,7 +95,7 @@
         writer.close();
         System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
 
-        File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db"));
+        File[] dataFiles = writer.getInnermostDirectory().listFiles((dir, name) -> name.endsWith("-Data.db"));
         long dataSize = 0l;
         for (File file : dataFiles)
         {
@@ -103,7 +103,7 @@
             dataSize += file.length();
         }
 
-        SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)
@@ -130,7 +130,7 @@
 
 
         //Stream again
-        loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 7f9de55..804bccb 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -137,7 +137,7 @@
     {
         String cfname = "mockcf" + (id.incrementAndGet());
         CFMetaData metadata = newCFMetaData(ksname, cfname);
-        return new ColumnFamilyStore(ks, cfname, 0, metadata, new Directories(metadata), false, false);
+        return new ColumnFamilyStore(ks, cfname, 0, metadata, new Directories(metadata), false, false, false);
     }
 
     public static CFMetaData newCFMetaData(String ksname, String cfname)
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 595610e..515ce18 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -131,12 +131,10 @@
     {
         cfs.truncateBlocking();
 
-        String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
 
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                       .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
-                                                       .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
+                                                       .withCfs(cfs)
                                                        .using(String.format(query, cfs.keyspace.getName(), cfs.name))
                                                        .build())
         {
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index e2202fe..87ba741 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -20,6 +20,7 @@
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
@@ -69,7 +70,12 @@
 
     public BigIntegerToken getRandomToken()
     {
-        return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));
+        return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public BigIntegerToken getRandomToken(Random random)
+    {
+        return new BigIntegerToken(BigInteger.valueOf(random.nextInt(15)));
     }
 
     private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 6df2d65..1cc303c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 
 import com.google.common.io.Files;
+import org.apache.commons.lang.ArrayUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -30,6 +31,7 @@
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.Util;
 
 import static org.junit.Assert.assertEquals;
 
@@ -41,6 +43,7 @@
     public void setUp()
     {
         this.testDirectory = Files.createTempDir();
+        Util.initDatabaseDescriptor();
         Config.setClientMode(true);
     }
 
@@ -86,16 +89,10 @@
         writer.close();
         writer2.close();
 
-        FilenameFilter filter = new FilenameFilter()
-        {
-            @Override
-            public boolean accept(File dir, String name)
-            {
-                return name.endsWith("-Data.db");
-            }
-        };
+        FilenameFilter filter = (dir, name) -> name.endsWith("-Data.db");
 
-        File[] dataFiles = this.testDirectory.listFiles(filter);
+        File[] dataFiles = (File[])ArrayUtils.addAll(writer2.getInnermostDirectory().listFiles(filter),
+                                                     writer.getInnermostDirectory().listFiles(filter));
         assertEquals(2, dataFiles.length);
     }
 }
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index caa92f6..877ca11 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -36,6 +36,7 @@
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.UDHelper;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
@@ -96,7 +97,7 @@
 
             writer.close();
 
-            loadSSTables(dataDir, KS);
+            loadSSTables(writer.getInnermostDirectory(), KS);
 
             UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
             assertEquals(4, rs.size());
@@ -186,7 +187,7 @@
                 return name.endsWith("-Data.db");
             }
         };
-        assert dataDir.list(filterDataFiles).length > 1 : Arrays.toString(dataDir.list(filterDataFiles));
+        assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 : Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles));
     }
 
 
@@ -220,28 +221,22 @@
     private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
     private class WriterThread extends Thread
     {
-        private final File dataDir;
         private final int id;
+        private final ColumnFamilyStore cfs;
         public volatile Exception exception;
 
-        public WriterThread(File dataDir, int id)
+        public WriterThread(ColumnFamilyStore cfs, int id)
         {
-            this.dataDir = dataDir;
+            this.cfs = cfs;
             this.id = id;
         }
 
         @Override
         public void run()
         {
-            String schema = "CREATE TABLE cql_keyspace2.table2 ("
-                    + "  k int,"
-                    + "  v int,"
-                    + "  PRIMARY KEY (k, v)"
-                    + ")";
             String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
             CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                    .inDirectory(dataDir)
-                    .forTable(schema)
+                    .withCfs(cfs)
                     .using(insert).build();
 
             try
@@ -269,10 +264,17 @@
         File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
         assert dataDir.mkdirs();
 
+        String schema = "CREATE TABLE cql_keyspace2.table2 ("
+                        + "  k int,"
+                        + "  v int,"
+                        + "  PRIMARY KEY (k, v)"
+                        + ")";
+        ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(schema, Collections.singletonList(dataDir));
+
         WriterThread[] threads = new WriterThread[5];
         for (int i = 0; i < threads.length; i++)
         {
-            WriterThread thread = new WriterThread(dataDir, i);
+            WriterThread thread = new WriterThread(cfs, i);
             threads[i] = thread;
             thread.start();
         }
@@ -287,7 +289,7 @@
             }
         }
 
-        loadSSTables(dataDir, KS);
+        loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS);
 
         UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
         assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
@@ -339,7 +341,7 @@
         }
 
         writer.close();
-        loadSSTables(dataDir, KS);
+        loadSSTables(writer.getInnermostDirectory(), KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
         TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
@@ -410,7 +412,7 @@
         }
 
         writer.close();
-        loadSSTables(dataDir, KS);
+        loadSSTables(writer.getInnermostDirectory(), KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
 
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 72c7467..ba7571f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -33,6 +34,8 @@
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -122,6 +125,8 @@
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
 
+
+        File outputDir;
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                        .inDirectory(dataDir)
                                                        .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
@@ -129,22 +134,22 @@
                                                        .build())
         {
             writer.addRow("key1", "col1", "100");
+            outputDir = writer.getInnermostDirectory();
         }
 
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
-
         final CountDownLatch latch = new CountDownLatch(1);
-        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
+        SSTableLoader loader = new SSTableLoader(outputDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
-        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
+        UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s;", KEYSPACE1, CF_STANDARD1));
 
-        assertEquals(1, partitions.size());
-        assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                                   .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
-                                                                   .value());
+        assertEquals(1, rs.size());
+
+        Iterator<UntypedResultSet.Row> iter = rs.iterator();
+        UntypedResultSet.Row row;
+
+        row = iter.next();
+        assertEquals("key1", row.getString("key"));
 
         // The stream future is signalled when the work is complete but before releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).
@@ -160,8 +165,9 @@
         //make sure we have no tables...
         assertTrue(dataDir.listFiles().length == 0);
 
-        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
-        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+        //Since this is running in the same jvm we need to put it in a tmp keyspace
+        String schema = "CREATE TABLE \"%stmp\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name)) with compression = {}";
+        String query = "INSERT INTO \"%stmp\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
 
         CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                   .inDirectory(dataDir)
@@ -170,7 +176,7 @@
                                                   .withBufferSizeInMB(1)
                                                   .build();
 
-        int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
+        int NB_PARTITIONS = 4200; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
 
         for (int i = 0; i < NB_PARTITIONS; i++)
         {
@@ -182,11 +188,11 @@
         cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
 
         //make sure we have some tables...
-        assertTrue(dataDir.listFiles().length > 0);
+        assertTrue(writer.getInnermostDirectory().listFiles().length > 0);
 
         final CountDownLatch latch = new CountDownLatch(2);
         //writer is still open so loader should not load anything
-        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
+        SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
@@ -196,7 +202,7 @@
         // now we complete the write and the second loader should load the last sstable as well
         writer.close();
 
-        loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
+        loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
diff --git a/tools/bin/compaction-stress b/tools/bin/compaction-stress
new file mode 100755
index 0000000..f169f2f
--- /dev/null
+++ b/tools/bin/compaction-stress
@@ -0,0 +1,57 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    # Locations (in order) to use when searching for an include file.
+    for include in "`dirname "$0"`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+"$JAVA" -server -ea -cp "$CLASSPATH" $JVM_OPTS \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+         org.apache.cassandra.stress.CompactionStress $@
+
+# vi:ai sw=4 ts=4 tw=0 et
diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
new file mode 100644
index 0000000..664f8d2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress;
+
+import java.io.File;
+import java.io.IOError;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import io.airlift.command.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.tools.nodetool.CompactionStats;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+/**
+ * Tool that allows fast route to loading data for arbitrary schemas to disk
+ * and compacting them.
+ */
+public abstract class CompactionStress implements Runnable
+{
+    @Inject
+    public HelpOption helpOption;
+
+    @Option(name = { "-p", "--profile" }, description = "Path to stress yaml file", required = true)
+    String profile;
+
+    @Option(name = { "-d", "--datadir" }, description = "Data directory (can be used many times to specify multiple data dirs)", required = true)
+    List<String> dataDirs;
+
+    @Option(name = {"-v", "--vnodes"}, description = "number of local tokens to generate (default 256)")
+    Integer numTokens = 256;
+
+    List<File> getDataDirectories()
+    {
+        List<File> dataDirectories = new ArrayList<>(dataDirs.size());
+        for (String dataDir : dataDirs)
+        {
+            File outputDir = new File(dataDir);
+
+            if (!outputDir.exists())
+            {
+                System.err.println("Invalid output dir (missing): " + outputDir);
+                System.exit(1);
+            }
+
+            if (!outputDir.isDirectory())
+            {
+                System.err.println("Invalid output dir (not a directory): " + outputDir);
+                System.exit(2);
+            }
+
+            if (!outputDir.canWrite())
+            {
+                System.err.println("Invalid output dir (no write permissions): " + outputDir);
+                System.exit(3);
+            }
+
+            dataDirectories.add(outputDir);
+        }
+
+        return dataDirectories;
+    }
+
+    ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables)
+    {
+        Util.initDatabaseDescriptor();
+
+        generateTokens(stressProfile.seedStr, StorageService.instance.getTokenMetadata(), numTokens);
+
+        CreateTableStatement.RawStatement createStatement = stressProfile.getCreateStatement();
+        List<File> dataDirectories = getDataDirectories();
+
+        ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories);
+
+        if (loadSSTables)
+        {
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+            List<SSTableReader> sstables = new ArrayList<>();
+
+            //Offline open sstables
+            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+            {
+                Set<Component> components = entry.getValue();
+                if (!components.contains(Component.DATA))
+                    continue;
+
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs);
+                    sstables.add(sstable);
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+                }
+            }
+
+            cfs.disableAutoCompaction();
+
+            //Register with cfs
+            cfs.addSSTables(sstables);
+        }
+
+        return cfs;
+    }
+
+    StressProfile getStressProfile()
+    {
+        try
+        {
+            File yamlFile = new File(profile);
+            return StressProfile.load(yamlFile.exists() ? yamlFile.toURI() : URI.create(profile));
+        }
+        catch ( IOError e)
+        {
+            e.printStackTrace();
+            System.err.print("Invalid profile URI : " + profile);
+            System.exit(4);
+        }
+
+        return null;
+    }
+
+    /**
+     * Populate tokenMetadata consistently across runs.
+     *
+     * We need consistency to write and compact the same data offline
+     * in the case of a range aware sstable writer.
+     */
+    private void generateTokens(String seed, TokenMetadata tokenMetadata, Integer numTokens)
+    {
+        Random random = new Random(seed.hashCode());
+
+        IPartitioner p = tokenMetadata.partitioner;
+        tokenMetadata.clearUnsafe();
+        for (int i = 1; i <= numTokens; i++)
+        {
+            InetAddress addr = FBUtilities.getBroadcastAddress();
+            List<Token> tokens = Lists.newArrayListWithCapacity(numTokens);
+            for (int j = 0; j < numTokens; ++j)
+                tokens.add(p.getRandomToken(random));
+
+            tokenMetadata.updateNormalTokens(tokens, addr);
+        }
+    }
+
+    public abstract void run();
+
+
+    @Command(name = "compact", description = "Compact data in directory")
+    public static class Compaction extends CompactionStress
+    {
+
+        @Option(name = {"-m", "--maximal"}, description = "Force maximal compaction (default true)")
+        Boolean maximal = false;
+
+        @Option(name = {"-t", "--threads"}, description = "Number of compactor threads to use for bg compactions (default 4)")
+        Integer threads = 4;
+
+        public void run()
+        {
+            //Setup
+            SystemKeyspace.finishStartup(); //needed for early-open
+            CompactionManager.instance.setMaximumCompactorThreads(threads);
+            CompactionManager.instance.setCoreCompactorThreads(threads);
+            CompactionManager.instance.setRate(0);
+
+            StressProfile stressProfile = getStressProfile();
+            ColumnFamilyStore cfs = initCf(stressProfile, true);
+            cfs.getCompactionStrategyManager().compactionLogger.enable();
+
+            List<Future<?>> futures = new ArrayList<>(threads);
+            if (maximal)
+            {
+                futures = CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false);
+            }
+            else
+            {
+                cfs.enableAutoCompaction();
+                cfs.getCompactionStrategyManager().enable();
+                for (int i = 0; i < threads; i++)
+                    futures.addAll(CompactionManager.instance.submitBackground(cfs));
+            }
+
+            long working;
+            //Report compaction stats while working
+            while ((working = futures.stream().filter(f -> !f.isDone()).count()) > 0 || CompactionManager.instance.getActiveCompactions() > 0 || (!maximal && cfs.getCompactionStrategyManager().getEstimatedRemainingTasks() > 0))
+            {
+                //Re-up any bg jobs
+                if (!maximal)
+                {
+                    for (long i = working; i < threads; i++)
+                        futures.addAll(CompactionManager.instance.submitBackground(cfs));
+                }
+
+                reportCompactionStats();
+                Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+            }
+
+            System.out.println("Finished! Shutting down...");
+            CompactionManager.instance.forceShutdown();
+
+            //Wait for cleanup to finish before forcing
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            LifecycleTransaction.removeUnfinishedLeftovers(cfs);
+        }
+    }
+
+    void reportCompactionStats()
+    {
+        System.out.println("========");
+        System.out.println(String.format("Pending compactions: %d\n", CompactionManager.instance.getPendingTasks()));
+        CompactionStats.reportCompactionTable(CompactionManager.instance.getCompactions(), 0, true);
+    }
+
+
+    @Command(name = "write", description = "write data directly to disk")
+    public static class DataWriter extends CompactionStress
+    {
+        private static double BYTES_IN_GB = 1024 * 1014 * 1024;
+
+        @Option(name = { "-g", "--gbsize"}, description = "Total GB size on disk you wish to write", required = true)
+        Integer totalSizeGb;
+
+        @Option(name = { "-t", "--threads" }, description = "Number of sstable writer threads (default 2)")
+        Integer threads = 2;
+
+        @Option(name = { "-c", "--partition-count"}, description = "Number of partitions to loop over (default 1000000)")
+        Integer partitions = 1000000;
+
+        @Option(name = { "-b", "--buffer-size-mb"}, description = "Buffer in MB writes before writing new sstable (default 128)")
+        Integer bufferSize = 128;
+
+        @Option(name = { "-r", "--range-aware"}, description = "Splits the local ranges in number of data directories and makes sure we never write the same token in two different directories (default true)")
+        Boolean makeRangeAware = true;
+
+        public void run()
+        {
+            StressProfile stressProfile = getStressProfile();
+            ColumnFamilyStore cfs = initCf(stressProfile, false);
+            Directories directories = cfs.getDirectories();
+
+            StressSettings settings = StressSettings.parse(new String[]{ "write", "-pop seq=1.." + partitions });
+            SeedManager seedManager = new SeedManager(settings);
+            PartitionGenerator generator = stressProfile.getOfflineGenerator();
+            WorkManager workManager = new WorkManager.FixedWorkManager(Long.MAX_VALUE);
+
+            ExecutorService executorService = Executors.newFixedThreadPool(threads);
+            CountDownLatch finished = new CountDownLatch(threads);
+
+            for (int i = 0; i < threads; i++)
+            {
+                //Every thread needs it's own writer
+                final SchemaInsert insert = stressProfile.getOfflineInsert(null, generator, seedManager, settings);
+                final CQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware);
+                executorService.submit(() -> {
+                    try
+                    {
+                        insert.runOffline(tableWriter, workManager);
+                    }
+                    catch (Exception e)
+                    {
+                        e.printStackTrace();
+                    }
+                    finally
+                    {
+                        FileUtils.closeQuietly(tableWriter);
+                        finished.countDown();
+                    }
+                });
+            }
+
+            double currentSizeGB;
+            while ((currentSizeGB = directories.getRawDiretoriesSize() / BYTES_IN_GB) < totalSizeGb)
+            {
+                if (finished.getCount() == 0)
+                    break;
+
+                System.out.println(String.format("Written %.2fGB of %dGB", currentSizeGB, totalSizeGb));
+
+                Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+            }
+
+            workManager.stop();
+            Uninterruptibles.awaitUninterruptibly(finished);
+
+            currentSizeGB = directories.getRawDiretoriesSize() / BYTES_IN_GB;
+            System.out.println(String.format("Finished writing %.2fGB", currentSizeGB));
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        Cli.CliBuilder<Runnable> builder = Cli.<Runnable>builder("compaction-stress")
+                                           .withDescription("benchmark for compaction")
+                                           .withDefaultCommand(Help.class)
+                                           .withCommands(Help.class, DataWriter.class, Compaction.class);
+
+        Cli<Runnable> stress = builder.build();
+
+        try
+        {
+            stress.parse(args).run();
+        }
+        catch (Throwable t)
+        {
+            t.printStackTrace();
+            System.exit(6);
+        }
+
+        System.exit(0);
+    }
+}
+
+
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 8b59bda..1964c27 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -29,15 +29,19 @@
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.stress.generate.*;
@@ -57,12 +61,14 @@
 import org.yaml.snakeyaml.constructor.Constructor;
 import org.yaml.snakeyaml.error.YAMLException;
 
+import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement;
+
 public class StressProfile implements Serializable
 {
     private String keyspaceCql;
     private String tableCql;
     private List<String> extraSchemaDefinitions;
-    private String seedStr;
+    public final String seedStr = "seed for stress";
 
     public String keyspaceName;
     public String tableName;
@@ -96,7 +102,6 @@
         keyspaceCql = yaml.keyspace_definition;
         tableName = yaml.table;
         tableCql = yaml.table_definition;
-        seedStr = "seed for stress";
         queries = yaml.queries;
         tokenRangeQueries = yaml.token_range_queries;
         insert = yaml.insert;
@@ -360,6 +365,87 @@
         return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup);
     }
 
+
+    public PartitionGenerator getOfflineGenerator()
+    {
+        CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName);
+
+        //Add missing column configs
+        Iterator<ColumnDefinition> it = cfMetaData.allColumnsInSelectOrder();
+        while (it.hasNext())
+        {
+            ColumnDefinition c = it.next();
+            if (!columnConfigs.containsKey(c.name.toString()))
+                columnConfigs.put(c.name.toString(), new GeneratorConfig(seedStr + c.name.toString(), null, null, null));
+        }
+
+        List<Generator> partitionColumns = cfMetaData.partitionKeyColumns().stream()
+                                                     .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString())))
+                                                     .map(c -> c.getGenerator())
+                                                     .collect(Collectors.toList());
+
+        List<Generator> clusteringColumns = cfMetaData.clusteringColumns().stream()
+                                                             .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString())))
+                                                             .map(c -> c.getGenerator())
+                                                             .collect(Collectors.toList());
+
+        List<Generator> regularColumns = com.google.common.collect.Lists.newArrayList(cfMetaData.partitionColumns().selectOrderIterator()).stream()
+                                                                                                             .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString())))
+                                                                                                             .map(c -> c.getGenerator())
+                                                                                                             .collect(Collectors.toList());
+
+        return new PartitionGenerator(partitionColumns, clusteringColumns, regularColumns, PartitionGenerator.Order.ARBITRARY);
+    }
+
+    public CreateTableStatement.RawStatement getCreateStatement()
+    {
+        CreateTableStatement.RawStatement createStatement = parseStatement(tableCql, CreateTableStatement.RawStatement.class, "CREATE TABLE");
+        createStatement.prepareKeyspace(keyspaceName);
+
+        return createStatement;
+    }
+
+    public SchemaInsert getOfflineInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
+    {
+        assert tableCql != null;
+
+        CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName);
+
+        List<ColumnDefinition> allColumns = com.google.common.collect.Lists.newArrayList(cfMetaData.allColumnsInSelectOrder());
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("INSERT INTO ").append(quoteIdentifier(keyspaceName) + "." + quoteIdentifier(tableName)).append(" (");
+        StringBuilder value = new StringBuilder();
+        for (ColumnDefinition c : allColumns)
+        {
+            sb.append(quoteIdentifier(c.name.toString())).append(", ");
+            value.append("?, ");
+        }
+        sb.delete(sb.lastIndexOf(","), sb.length());
+        value.delete(value.lastIndexOf(","), value.length());
+        sb.append(") ").append("values(").append(value).append(')');
+
+
+        if (insert == null)
+            insert = new HashMap<>();
+        lowerCase(insert);
+
+        partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER);
+        selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+        rowPopulation = select(settings.insert.rowPopulationRatio, "row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+
+        if (generator.maxRowCount > 100 * 1000 * 1000)
+            System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))%n", generator.maxRowCount);
+
+        String statement = sb.toString();
+
+        //CQLTableWriter requires the keyspace name be in the create statement
+        String tableCreate = tableCql.replaceFirst("\\s+\"?"+tableName+"\"?\\s+", " \""+keyspaceName+"\".\""+tableName+"\" ");
+
+
+        return new SchemaInsert(timer, settings, generator, seedManager, selectchance.get(), rowPopulation.get(), thriftInsertId, statement, tableCreate);
+    }
+
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
     {
         if (insertStatement == null)
@@ -562,12 +648,18 @@
             Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
 
             for (ColumnMetadata metadata : tableMetaData.getPartitionKey())
-                partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+                partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(),
+                                                 metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                 columnConfigs.get(metadata.getName())));
             for (ColumnMetadata metadata : tableMetaData.getClusteringColumns())
-                clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+                clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(),
+                                                     metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                     columnConfigs.get(metadata.getName())));
             for (ColumnMetadata metadata : tableMetaData.getColumns())
                 if (!keyColumns.contains(metadata))
-                    valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+                    valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(),
+                                                    metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                    columnConfigs.get(metadata.getName())));
         }
 
         PartitionGenerator newGenerator(StressSettings settings)
@@ -587,66 +679,68 @@
     static class ColumnInfo
     {
         final String name;
-        final DataType type;
+        final String type;
+        final String collectionType;
         final GeneratorConfig config;
 
-        ColumnInfo(String name, DataType type, GeneratorConfig config)
+        ColumnInfo(String name, String type, String collectionType, GeneratorConfig config)
         {
             this.name = name;
             this.type = type;
+            this.collectionType = collectionType;
             this.config = config;
         }
 
         Generator getGenerator()
         {
-            return getGenerator(name, type, config);
+            return getGenerator(name, type, collectionType, config);
         }
 
-        static Generator getGenerator(final String name, final DataType type, GeneratorConfig config)
+        static Generator getGenerator(final String name, final String type, final String collectionType, GeneratorConfig config)
         {
-            switch (type.getName())
+            switch (type.toUpperCase())
             {
-                case ASCII:
-                case TEXT:
-                case VARCHAR:
+                case "ASCII":
+                case "TEXT":
+                case "VARCHAR":
                     return new Strings(name, config);
-                case BIGINT:
-                case COUNTER:
+                case "BIGINT":
+                case "COUNTER":
                     return new Longs(name, config);
-                case BLOB:
+                case "BLOB":
                     return new Bytes(name, config);
-                case BOOLEAN:
+                case "BOOLEAN":
                     return new Booleans(name, config);
-                case DECIMAL:
+                case "DECIMAL":
                     return new BigDecimals(name, config);
-                case DOUBLE:
+                case "DOUBLE":
                     return new Doubles(name, config);
-                case FLOAT:
+                case "FLOAT":
                     return new Floats(name, config);
-                case INET:
+                case "INET":
                     return new Inets(name, config);
-                case INT:
+                case "INT":
                     return new Integers(name, config);
-                case VARINT:
+                case "VARINT":
                     return new BigIntegers(name, config);
-                case TIMESTAMP:
+                case "TIMESTAMP":
                     return new Dates(name, config);
-                case UUID:
+                case "UUID":
                     return new UUIDs(name, config);
-                case TIMEUUID:
+                case "TIMEUUID":
                     return new TimeUUIDs(name, config);
-                case TINYINT:
+                case "TINYINT":
                     return new TinyInts(name, config);
-                case SMALLINT:
+                case "SMALLINT":
                     return new SmallInts(name, config);
-                case TIME:
+                case "TIME":
                     return new Times(name, config);
-                case DATE:
+                case "DATE":
                     return new LocalDates(name, config);
-                case SET:
-                    return new Sets(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
-                case LIST:
-                    return new Lists(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
+                case "SET":
+                    return new Sets(name, getGenerator(name, collectionType, null, config), config);
+                case "LIST":
+                    return new Lists(name, getGenerator(name, collectionType, null, config), config);
                 default:
                     throw new UnsupportedOperationException("Because of this name: "+name+" if you removed it from the yaml and are still seeing this, make sure to drop table");
             }
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index a7297c5..1230065 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -22,10 +22,8 @@
 
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 
@@ -71,7 +69,7 @@
         }
         this.maxRowCount = maxRowCount;
         this.minRowCount = minRowCount;
-        this.indexMap = new HashMap<>();
+        this.indexMap = new LinkedHashMap<>();
         int i = 0;
         for (Generator generator : partitionKey)
             indexMap.put(generator.name, --i);
@@ -110,4 +108,9 @@
             return clusteringComponents.get(c).type.compose(v);
         return valueComponents.get(c - clusteringComponents.size()).type.compose(v);
     }
+
+    public List<String> getColumnNames()
+    {
+        return indexMap.keySet().stream().collect(Collectors.toList());
+    }
 }
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index d9fcac8..9eebce2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -21,15 +21,27 @@
  */
 
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.stress.WorkManager;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -39,12 +51,27 @@
 public class SchemaInsert extends SchemaStatement
 {
 
+    private final String tableSchema;
+    private final String insertStatement;
     private final BatchStatement.Type batchType;
 
     public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio, rowPopulation), statement, thriftId, cl);
+        super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio, rowPopulation), statement, statement.getVariables().asList().stream().map(d -> d.getName()).collect(Collectors.toList()), thriftId, cl);
         this.batchType = batchType;
+        this.insertStatement = null;
+        this.tableSchema = null;
+    }
+
+    /**
+     * Special constructor for offline use
+     */
+    public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, String statement, String tableSchema)
+    {
+        super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), useRatio, rowPopulation), null, generator.getColumnNames(), thriftId, ConsistencyLevel.ONE);
+        this.batchType = BatchStatement.Type.UNLOGGED;
+        this.insertStatement = statement;
+        this.tableSchema = tableSchema;
     }
 
     private class JavaDriverRun extends Runner
@@ -113,6 +140,31 @@
         }
     }
 
+    private class OfflineRun extends Runner
+    {
+        final CQLSSTableWriter writer;
+
+        OfflineRun(CQLSSTableWriter writer)
+        {
+            this.writer = writer;
+        }
+
+        public boolean run() throws Exception
+        {
+            for (PartitionIterator iterator : partitions)
+            {
+                while (iterator.hasNext())
+                {
+                    Row row = iterator.next();
+                    writer.rawAddRow(thriftRowArgs(row));
+                    rowCount += 1;
+                }
+            }
+
+            return true;
+        }
+    }
+
     @Override
     public void run(JavaDriverClient client) throws IOException
     {
@@ -130,4 +182,27 @@
         timeWithRetry(new ThriftRun(client));
     }
 
+    public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware)
+    {
+        return CQLSSTableWriter.builder()
+                               .withCfs(cfs)
+                               .withBufferSizeInMB(bufferSize)
+                               .forTable(tableSchema)
+                               .using(insertStatement)
+                               .rangeAware(makeRangeAware)
+                               .build();
+    }
+
+    public void runOffline(CQLSSTableWriter writer, WorkManager workManager) throws Exception
+    {
+        OfflineRun offline = new OfflineRun(writer);
+
+        while (true)
+        {
+            if (ready(workManager) == 0)
+                break;
+
+            offline.run();
+        }
+    }
 }
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index 9b5c4ae..0d8e756 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
@@ -53,7 +54,8 @@
 
     public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl);
+        super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement,
+              statement.getVariables().asList().stream().map(d -> d.getName()).collect(Collectors.toList()), thriftId, cl);
         this.argSelect = argSelect;
         randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
     }
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 2e03c69..c83787b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -38,7 +38,6 @@
 
 public abstract class SchemaStatement extends PartitionOperation
 {
-
     final PreparedStatement statement;
     final Integer thriftId;
     final ConsistencyLevel cl;
@@ -47,24 +46,27 @@
     final ColumnDefinitions definitions;
 
     public SchemaStatement(Timer timer, StressSettings settings, DataSpec spec,
-                           PreparedStatement statement, Integer thriftId, ConsistencyLevel cl)
+                           PreparedStatement statement, List<String> bindNames, Integer thriftId, ConsistencyLevel cl)
     {
         super(timer, settings, spec);
         this.statement = statement;
         this.thriftId = thriftId;
         this.cl = cl;
-        argumentIndex = new int[statement.getVariables().size()];
+        argumentIndex = new int[bindNames.size()];
         bindBuffer = new Object[argumentIndex.length];
-        definitions = statement.getVariables();
+        definitions = statement != null ? statement.getVariables() : null;
         int i = 0;
-        for (ColumnDefinitions.Definition definition : definitions)
-            argumentIndex[i++] = spec.partitionGenerator.indexOf(definition.getName());
+        for (String name : bindNames)
+            argumentIndex[i++] = spec.partitionGenerator.indexOf(name);
 
-        statement.setConsistencyLevel(JavaDriverClient.from(cl));
+        if (statement != null)
+            statement.setConsistencyLevel(JavaDriverClient.from(cl));
     }
 
     BoundStatement bindRow(Row row)
     {
+        assert statement != null;
+
         for (int i = 0 ; i < argumentIndex.length ; i++)
         {
             Object value = row.get(argumentIndex[i]);