1) Adjust SingleSegmentLoader to allow for storing segments on multiple different mount points.  The specification language is really janky right now, so this is remaining a stealth feature for the time being.
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
index 0ee2a6e..5bbfd73 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
@@ -24,6 +24,10 @@
 import com.metamx.druid.client.DataSegment;
 import com.metamx.druid.client.ServerView;
 import com.metamx.druid.coordination.DataSegmentAnnouncer;
+import com.metamx.druid.indexing.common.actions.TaskActionClient;
+import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
+import com.metamx.druid.indexing.common.config.TaskConfig;
+import com.metamx.druid.indexing.common.task.Task;
 import com.metamx.druid.loading.DataSegmentKiller;
 import com.metamx.druid.loading.DataSegmentPusher;
 import com.metamx.druid.loading.MMappedQueryableIndexFactory;
@@ -31,10 +35,6 @@
 import com.metamx.druid.loading.SegmentLoaderConfig;
 import com.metamx.druid.loading.SegmentLoadingException;
 import com.metamx.druid.loading.SingleSegmentLoader;
-import com.metamx.druid.indexing.common.actions.TaskActionClient;
-import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
-import com.metamx.druid.indexing.common.config.TaskConfig;
-import com.metamx.druid.indexing.common.task.Task;
 import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
 import com.metamx.emitter.service.ServiceEmitter;
 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@@ -141,9 +141,9 @@
         new SegmentLoaderConfig()
         {
           @Override
-          public File getCacheDirectory()
+          public String getCacheDirectory()
           {
-            return new File(getTaskWorkDir(), "fetched_segments");
+            return new File(getTaskWorkDir(), "fetched_segments").toString();
           }
         }
     );
diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
index b821c65..306d7f4 100644
--- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
@@ -39,9 +39,13 @@
   /**
    * Returns the last modified time of the given segment.
    *
+   * Note, this is not actually used at this point and doesn't need to actually be implemented.  It's just still here
+   * to not break compatibility.
+   *
    * @param segment The segment to check the last modified time for
    * @return the last modified time in millis from the epoch
    * @throws SegmentLoadingException if there are any errors
    */
+  @Deprecated
   public long getLastModified(DataSegment segment) throws SegmentLoadingException;
 }
diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
index bc44f82..e72bd78 100644
--- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
+++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
@@ -20,32 +20,14 @@
 package com.metamx.druid.loading;
 
 import com.google.common.base.Joiner;
-import com.metamx.common.MapUtils;
 import com.metamx.druid.client.DataSegment;
 
-import java.util.Map;
-
 /**
  */
 public class DataSegmentPusherUtil
 {
   private static final Joiner JOINER = Joiner.on("/").skipNulls();
 
-  public static String getLegacyStorageDir(DataSegment segment)
-  {
-    final Map<String,Object> loadSpec = segment.getLoadSpec();
-
-    String specType = MapUtils.getString(loadSpec, "type");
-    if (specType.startsWith("s3")) {
-      String s3Bucket = MapUtils.getString(loadSpec, "bucket");
-      String s3Path = MapUtils.getString(loadSpec, "key");
-
-      return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/")));
-    }
-
-    return null;
-  }
-
   public static String getStorageDir(DataSegment segment)
   {
     return JOINER.join(
diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
index 294c91b..8a0e324 100644
--- a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
@@ -21,14 +21,18 @@
 
 import org.skife.config.Config;
 
-import java.io.File;
-
 /**
  */
 public abstract class SegmentLoaderConfig
 {
   @Config({"druid.paths.indexCache", "druid.segmentCache.path"})
-  public abstract File getCacheDirectory();
+  public abstract String getCacheDirectory();
+
+  @Config("druid.server.maxSize")
+  public long getServerMaxSize()
+  {
+    return Long.MAX_VALUE;
+  }
 
   @Config("druid.segmentCache.deleteOnRemove")
   public boolean deleteOnRemove()
diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
index 61e9986..f0e9a7f 100644
--- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
+++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
@@ -19,9 +19,13 @@
 
 package com.metamx.druid.loading;
 
-import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 import com.google.inject.Inject;
-import com.metamx.common.StreamUtils;
+import com.metamx.common.IAE;
+import com.metamx.common.ISE;
 import com.metamx.common.logger.Logger;
 import com.metamx.druid.client.DataSegment;
 import com.metamx.druid.index.QueryableIndex;
@@ -29,7 +33,11 @@
 import com.metamx.druid.index.Segment;
 import org.apache.commons.io.FileUtils;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 /**
  */
@@ -39,8 +47,8 @@
 
   private final DataSegmentPuller dataSegmentPuller;
   private final QueryableIndexFactory factory;
-  private final SegmentLoaderConfig config;
-  private static final Joiner JOINER = Joiner.on("/").skipNulls();
+
+  private final List<StorageLocation> locations;
 
   @Inject
   public SingleSegmentLoader(
@@ -51,22 +59,52 @@
   {
     this.dataSegmentPuller = dataSegmentPuller;
     this.factory = factory;
-    this.config = config;
+
+    final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
+
+    // This is a really, really stupid way of getting this information.  Splitting on commas and bars is error-prone
+    // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
+    // But, that'll have to wait for some other day.
+    for (String dirSpec : config.getCacheDirectory().split(",")) {
+      String[] dirSplit = dirSpec.split("\\|");
+      if (dirSplit.length == 1) {
+        locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize()));
+      }
+      else if (dirSplit.length == 2) {
+        final Long maxSize = Longs.tryParse(dirSplit[1]);
+        if (maxSize == null) {
+          throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
+        }
+        locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
+      }
+      else {
+        throw new ISE(
+            "Unknown segment storage location[%s]=>[%s], config[%s].",
+            dirSplit.length, dirSpec, config.getCacheDirectory()
+        );
+      }
+    }
+    locations = locBuilder.build();
+
+    Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
+    log.info("Using storage locations[%s]", locations);
   }
 
   @Override
   public boolean isSegmentLoaded(final DataSegment segment)
   {
-    File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
-    if (localStorageDir.exists()) {
-      return true;
-    }
+    return findStorageLocationIfLoaded(segment) != null;
+  }
 
-    final File legacyStorageDir = new File(
-        config.getCacheDirectory(),
-        DataSegmentPusherUtil.getLegacyStorageDir(segment)
-    );
-    return legacyStorageDir.exists();
+  public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
+  {
+    for (StorageLocation location : locations) {
+      File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+      if (localStorageDir.exists()) {
+        return location;
+      }
+    }
+    return null;
   }
 
   @Override
@@ -80,111 +118,129 @@
 
   public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
   {
-    File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
+    StorageLocation loc = findStorageLocationIfLoaded(segment);
 
-    final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment);
-    if (legacyDir != null) {
-      File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir);
+    final File retVal;
 
-      if (legacyStorageDir.exists()) {
-        log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir);
-        if (localStorageDir.exists()) {
-          try {
-            FileUtils.deleteDirectory(localStorageDir);
-          }
-          catch (IOException e) {
-            throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir);
-          }
-        }
-        final File parentDir = localStorageDir.getParentFile();
-        if (!parentDir.exists()) {
-          log.info("Parent[%s] didn't exist, creating.", parentDir);
-          if (!parentDir.mkdirs()) {
-            log.warn("Unable to make parentDir[%s]", parentDir);
-          }
-        }
-
-        if (!legacyStorageDir.renameTo(localStorageDir)) {
-          log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir);
-        }
+    if (loc == null) {
+      Iterator<StorageLocation> locIter = locations.iterator();
+      loc = locIter.next();
+      while (locIter.hasNext()) {
+        loc = loc.mostEmpty(locIter.next());
       }
-    }
 
-    if (localStorageDir.exists()) {
-      long localLastModified = localStorageDir.lastModified();
-      long remoteLastModified = dataSegmentPuller.getLastModified(segment);
-      if (remoteLastModified > 0 && localLastModified >= remoteLastModified) {
-        log.info(
-            "Found localStorageDir[%s] with modified[%s], which is same or after remote[%s].  Using.",
-            localStorageDir, localLastModified, remoteLastModified
-        );
-        return localStorageDir;
-      }
-    }
-
-    if (localStorageDir.exists()) {
-      try {
-        FileUtils.deleteDirectory(localStorageDir);
-      }
-      catch (IOException e) {
-        log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
-      }
-    }
-    if (!localStorageDir.mkdirs()) {
-      log.info("Unable to make parent file[%s]", localStorageDir);
-    }
-
-    dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
-
-    return localStorageDir;
-  }
-
-  private File getLocalStorageDir(DataSegment segment)
-  {
-    String outputKey = JOINER.join(
-        segment.getDataSource(),
-        String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
-        segment.getVersion(),
-        segment.getShardSpec().getPartitionNum()
-    );
-
-    return new File(config.getCacheDirectory(), outputKey);
-  }
-
-  private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
-  {
-    log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
-    if (!pulledFile.renameTo(cacheFile)) {
-      log.warn("Error renaming pulledFile[%s] to cacheFile[%s].  Copying instead.", pulledFile, cacheFile);
-
-      try {
-        StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
-      }
-      catch (IOException e) {
-        throw new SegmentLoadingException(
-            e,
-            "Problem moving pulledFile[%s] to cache[%s]",
-            pulledFile,
-            cacheFile
+      if (!loc.canHandle(segment.getSize())) {
+        throw new ISE(
+            "Segment[%s:%,d] too large for storage[%s:%,d].",
+            segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
         );
       }
-      if (!pulledFile.delete()) {
-        log.error("Could not delete pulledFile[%s].", pulledFile);
+
+      File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+      if (!storageDir.mkdirs()) {
+        log.debug("Unable to make parent file[%s]", storageDir);
       }
+
+      dataSegmentPuller.getSegmentFiles(segment, storageDir);
+      loc.addSegment(segment);
+
+      retVal = storageDir;
     }
+    else {
+      retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+    }
+
+    loc.addSegment(segment);
+
+    return retVal;
   }
 
   @Override
   public void cleanup(DataSegment segment) throws SegmentLoadingException
   {
-    File cacheFile = getLocalStorageDir(segment);
+    StorageLocation loc = findStorageLocationIfLoaded(segment);
+
+    if (loc == null) {
+      log.info("Asked to cleanup something[%s] that didn't exist.  Skipping.", segment);
+      return;
+    }
 
     try {
+      File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
       log.info("Deleting directory[%s]", cacheFile);
       FileUtils.deleteDirectory(cacheFile);
+      loc.removeSegment(segment);
     }
     catch (IOException e) {
       throw new SegmentLoadingException(e, e.getMessage());
     }
   }
+
+  private static class StorageLocation
+  {
+    private final File path;
+    private final long maxSize;
+    private final Set<DataSegment> segments;
+
+    private volatile long currSize = 0;
+
+    StorageLocation(
+        File path,
+        long maxSize
+    )
+    {
+      this.path = path;
+      this.maxSize = maxSize;
+
+      this.segments = Sets.newHashSet();
+    }
+
+    private File getPath()
+    {
+      return path;
+    }
+
+    private Long getMaxSize()
+    {
+      return maxSize;
+    }
+
+    private synchronized void addSegment(DataSegment segment)
+    {
+      if (! segments.add(segment)) {
+        currSize += segment.getSize();
+      }
+    }
+
+    private synchronized void removeSegment(DataSegment segment)
+    {
+      if (segments.remove(segment)) {
+        currSize -= segment.getSize();
+      }
+    }
+
+    private boolean canHandle(long size)
+    {
+      return available() > size;
+    }
+
+    private synchronized long available()
+    {
+      return maxSize - currSize;
+    }
+
+    private StorageLocation mostEmpty(StorageLocation other)
+    {
+      return available() > other.available() ? this : other;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "StorageLocation{" +
+             "path=" + path +
+             ", maxSize=" + maxSize +
+             '}';
+    }
+  }
 }