DL-157: resource placement for write proxy
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 87d3b53..5b04a05 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -85,6 +85,7 @@
     boolean enableBatching = false;
     int batchBufferSize = 256 * 1024;
     int batchFlushIntervalMicros = 2000;
+    String routingServiceFinagleNameString;
 
     final DistributedLogConfiguration conf = new DistributedLogConfiguration();
     final StatsReceiver statsReceiver = new OstrichStatsReceiver();
@@ -125,6 +126,7 @@
         options.addOption("bt", "enable-batch", false, "Enable batching on writers");
         options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
         options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
+        options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
         options.addOption("h", "help", false, "Print usage.");
     }
 
@@ -221,6 +223,9 @@
         if (cmdline.hasOption("rb")) {
             recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
         }
+        if (cmdline.hasOption("rs")) {
+            routingServiceFinagleNameString = cmdline.getOptionValue("rs");
+        }
         thriftmux = cmdline.hasOption("mx");
         handshakeWithClientInfo = cmdline.hasOption("hsci");
         readFromHead = cmdline.hasOption("rfh");
@@ -311,7 +316,8 @@
                 recvBufferSize,
                 enableBatching,
                 batchBufferSize,
-                batchFlushIntervalMicros);
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
     }
 
     protected WriterWorker createWriteWorker(
@@ -335,7 +341,8 @@
             int recvBufferSize,
             boolean enableBatching,
             int batchBufferSize,
-            int batchFlushIntervalMicros) {
+            int batchFlushIntervalMicros,
+            String routingServiceFinagleNameString) {
         return new WriterWorker(
                 streamPrefix,
                 uri,
@@ -357,7 +364,8 @@
                 recvBufferSize,
                 enableBatching,
                 batchBufferSize,
-                batchFlushIntervalMicros);
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
     }
 
     Worker runDLWriter() throws IOException {
@@ -453,7 +461,7 @@
         try {
             benchmarker.run();
         } catch (Exception e) {
-            logger.info("Benchmark quitted due to : ", e);
+            logger.info("Benchmark quit due to : ", e);
         }
     }
 
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 46229b3..dc5a6e2 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -81,6 +81,7 @@
     final boolean enableBatching;
     final int batchBufferSize;
     final int batchFlushIntervalMicros;
+    private final String routingServiceFinagleName;
 
     volatile boolean running = true;
 
@@ -113,7 +114,8 @@
                         int recvBufferSize,
                         boolean enableBatching,
                         int batchBufferSize,
-                        int batchFlushIntervalMicros) {
+                        int batchFlushIntervalMicros,
+                        String routingServiceFinagleName) {
         checkArgument(startStreamId <= endStreamId);
         checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
         this.streamPrefix = streamPrefix;
@@ -143,6 +145,7 @@
         this.finagleNames = finagleNames;
         this.serverSets = createServerSets(serverSetPaths);
         this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        this.routingServiceFinagleName = routingServiceFinagleName;
 
         // Streams
         streamNames = new ArrayList<String>(endStreamId - startStreamId);
@@ -197,6 +200,7 @@
             .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
             .periodicDumpOwnershipCache(true)
             .handshakeTracing(true)
+            .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
             .name("writer");
 
         if (!finagleNames.isEmpty()) {
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
index 634afe1..1077cd0 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
@@ -73,6 +73,7 @@
 import com.twitter.util.Return;
 import com.twitter.util.Throw;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -852,18 +853,18 @@
         }
     }
 
-    private void retryGetOwnerFromRoutingServer(final StreamOp op,
+    private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
                                                 final Promise<SocketAddress> getOwnerPromise,
                                                 final Throwable cause) {
         if (op.shouldTimeout()) {
             op.fail(null, cause);
             return;
         }
-        getOwnerFromRoutingServer(op, getOwnerPromise);
+        getOwnerFromResourcePlacementServer(op, getOwnerPromise);
     }
 
-    private void getOwnerFromRoutingServer(final StreamOp op,
-                                           final Promise<SocketAddress> getOwnerPromise) {
+    private void getOwnerFromResourcePlacementServer(final StreamOp op,
+                                                     final Promise<SocketAddress> getOwnerPromise) {
         clusterClient.get().getService().getOwner(op.stream, op.ctx)
             .addEventListener(new FutureEventListener<WriteResponse>() {
                 @Override
@@ -875,18 +876,20 @@
                 public void onSuccess(WriteResponse value) {
                     if (StatusCode.FOUND == value.getHeader().getCode()
                           && null != value.getHeader().getLocation()) {
-                        SocketAddress addr;
                         try {
-                             addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress();
+                            InetSocketAddress addr = DLSocketAddress.deserialize(
+                                value.getHeader().getLocation()
+                            ).getSocketAddress();
+                            getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
                         } catch (IOException e) {
                             // retry from the routing server again
-                            retryGetOwnerFromRoutingServer(op, getOwnerPromise, e);
+                            logger.error("ERROR in getOwner", e);
+                            retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
                             return;
                         }
-                        getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
                     } else {
                         // retry from the routing server again
-                        retryGetOwnerFromRoutingServer(op, getOwnerPromise,
+                        retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
                                 new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
                     }
                 }
@@ -896,7 +899,7 @@
     private Future<SocketAddress> getOwner(final StreamOp op) {
         if (clusterClient.isPresent()) {
             final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
-            getOwnerFromRoutingServer(op, getOwnerPromise);
+            getOwnerFromResourcePlacementServer(op, getOwnerPromise);
             return getOwnerPromise;
         }
         // pickup host by hashing
@@ -1190,7 +1193,7 @@
             ownershipCache.updateOwner(stream, ownerAddr);
         } catch (IOException e) {
             logger.warn("Invalid ownership {} found for stream {} : ",
-                        new Object[] { location, stream, e });
+                new Object[] { location, stream, e });
         }
     }
 
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index 44d93ee..3f65aff 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -96,6 +96,7 @@
         newBuilder.statsReceiver = builder.statsReceiver;
         newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
         newBuilder.enableRegionStats = builder.enableRegionStats;
+        newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
         newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
         return newBuilder;
     }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 2c9fe44..e7f29cc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -609,10 +609,10 @@
         return rootPath;
     }
 
-    private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
-                                                                  DistributedLogConfiguration conf,
-                                                                  String zkServers,
-                                                                  StatsLogger statsLogger) {
+    public static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
+                                                                 DistributedLogConfiguration conf,
+                                                                 String zkServers,
+                                                                 StatsLogger statsLogger) {
         RetryPolicy retryPolicy = null;
         if (conf.getZKNumRetries() > 0) {
             retryPolicy = new BoundExponentialBackoffRetryPolicy(
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index b7b6ff8..e74d486 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -117,10 +117,49 @@
       <artifactId>jetty-servlet</artifactId>
       <version>${jetty.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.5.0-1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scrooge-core_2.11</artifactId>
+      <version>${scrooge.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>2.2.0-incubating</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
       <plugin>
+        <groupId>com.twitter</groupId>
+        <artifactId>scrooge-maven-plugin</artifactId>
+        <version>${scrooge-maven-plugin.version}</version>
+        <configuration>
+          <language>java</language>
+        </configuration>
+        <executions>
+          <execution>
+            <id>thrift-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <version>2.2.1</version>
         <configuration>
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 0ce335b..3225ced 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -22,6 +22,7 @@
 import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.finagle.builder.Server;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -228,9 +229,11 @@
                             routingService,
                             new NullStatsProvider(),
                             proxyPort,
-                            thriftmux);
+                            thriftmux,
+                            new EqualLoadAppraiser());
                     routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
                     routingService.startService();
+                    serverPair.getLeft().startPlacementPolicy();
                     success = true;
                 } catch (BindException be) {
                     retries++;
@@ -244,7 +247,7 @@
                 }
             }
 
-            LOG.info("Runnning DL on port {}", proxyPort);
+            LOG.info("Running DL on port {}", proxyPort);
 
             dlServer = serverPair;
             address = DLSocketAddress.getSocketAddress(proxyPort);
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 185ea82..a9ba125 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,8 +17,32 @@
  */
 package com.twitter.distributedlog.service;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.Tuple2;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -31,6 +55,8 @@
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.DistributedLogService;
@@ -46,31 +72,11 @@
 import com.twitter.finagle.thrift.ThriftServerFramedCodec;
 import com.twitter.finagle.transport.Transport;
 import com.twitter.util.Duration;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 public class DistributedLogServer {
 
     static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+    private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
 
     private DistributedLogServiceImpl dlService = null;
     private Server server = null;
@@ -89,6 +95,7 @@
     private final Optional<Integer> statsPort;
     private final Optional<Integer> shardId;
     private final Optional<Boolean> announceServerSet;
+    private final Optional<String> loadAppraiserClassStr;
     private final Optional<Boolean> thriftmux;
 
     DistributedLogServer(Optional<String> uri,
@@ -98,6 +105,7 @@
                          Optional<Integer> statsPort,
                          Optional<Integer> shardId,
                          Optional<Boolean> announceServerSet,
+                         Optional<String> loadAppraiserClass,
                          Optional<Boolean> thriftmux,
                          RoutingService routingService,
                          StatsReceiver statsReceiver,
@@ -113,9 +121,10 @@
         this.routingService = routingService;
         this.statsReceiver = statsReceiver;
         this.statsProvider = statsProvider;
+        this.loadAppraiserClassStr = loadAppraiserClass;
     }
 
-    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException {
+    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
         if (!uri.isPresent()) {
             throw new IllegalArgumentException("No distributedlog uri provided.");
         }
@@ -174,6 +183,9 @@
                     IdentityStreamPartitionConverter.class.getName());
             converter = new IdentityStreamPartitionConverter();
         }
+        Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
+        LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
+        logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
 
         StreamConfigProvider streamConfProvider =
                 getStreamConfigProvider(dlConf, converter);
@@ -193,7 +205,8 @@
                 keepAliveLatch,
                 statsReceiver,
                 thriftmux.isPresent(),
-                streamConfProvider);
+                streamConfProvider,
+                loadAppraiser);
 
         this.dlService = serverPair.getLeft();
         this.server = serverPair.getRight();
@@ -203,6 +216,8 @@
         // start the routing service after announced
         routingService.startService();
         logger.info("Started the routing service.");
+        dlService.startPlacementPolicy();
+        logger.info("Started the placement policy.");
     }
 
     protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
@@ -256,7 +271,8 @@
             RoutingService routingService,
             StatsProvider provider,
             int port,
-            boolean thriftmux) throws IOException {
+            boolean thriftmux,
+            LoadAppraiser loadAppraiser) throws IOException {
 
         return runServer(serverConf,
                 dlConf,
@@ -269,7 +285,8 @@
                 new CountDownLatch(0),
                 new NullStatsReceiver(),
                 thriftmux,
-                new NullStreamConfigProvider());
+                new NullStreamConfigProvider(),
+                loadAppraiser);
     }
 
     static Pair<DistributedLogServiceImpl, Server> runServer(
@@ -284,7 +301,8 @@
             CountDownLatch keepAliveLatch,
             StatsReceiver statsReceiver,
             boolean thriftmux,
-            StreamConfigProvider streamConfProvider) throws IOException {
+            StreamConfigProvider streamConfProvider,
+            LoadAppraiser loadAppraiser) throws IOException {
         logger.info("Running server @ uri {}.", dlUri);
 
         boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
@@ -297,16 +315,17 @@
 
         // dl service
         DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
-                serverConf,
-                dlConf,
-                dynDlConf,
-                streamConfProvider,
-                dlUri,
-                partitionConverter,
-                routingService,
-                provider.getStatsLogger(""),
-                perStreamStatsLogger,
-                keepAliveLatch);
+            serverConf,
+            dlConf,
+            dynDlConf,
+            streamConfProvider,
+            dlUri,
+            partitionConverter,
+            routingService,
+            provider.getStatsLogger(""),
+            perStreamStatsLogger,
+            keepAliveLatch,
+            loadAppraiser);
 
         StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
         StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
@@ -400,6 +419,7 @@
      * @throws ConfigurationException
      * @throws IllegalArgumentException
      * @throws IOException
+     * @throws ClassNotFoundException
      */
     public static DistributedLogServer runServer(
                Optional<String> uri,
@@ -409,11 +429,12 @@
                Optional<Integer> statsPort,
                Optional<Integer> shardId,
                Optional<Boolean> announceServerSet,
+               Optional<String> loadAppraiserClass,
                Optional<Boolean> thriftmux,
                RoutingService routingService,
                StatsReceiver statsReceiver,
                StatsProvider statsProvider)
-            throws ConfigurationException, IllegalArgumentException, IOException {
+        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
 
         final DistributedLogServer server = new DistributedLogServer(
                 uri,
@@ -423,6 +444,7 @@
                 statsPort,
                 shardId,
                 announceServerSet,
+                loadAppraiserClass,
                 thriftmux,
                 routingService,
                 statsReceiver,
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index af36307..1c3d8d4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -68,6 +68,7 @@
         options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
         options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
         options.addOption("a", "announce", false, "ServerSet Path to Announce");
+        options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
         options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
     }
 
@@ -97,10 +98,13 @@
         } catch (IOException ie) {
             logger.error("Failed to start distributedlog server : ", ie);
             Runtime.getRuntime().exit(-1);
+        } catch (ClassNotFoundException cnf) {
+          logger.error("Failed to start distributedlog server : ", cnf);
+          Runtime.getRuntime().exit(-1);
         }
     }
 
-    private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException {
+    private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
         final StatsReceiver statsReceiver = NullStatsReceiver.get();
         Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
         DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -142,6 +146,7 @@
                 getOptionalIntegerArg(cmdline, "sp"),
                 getOptionalIntegerArg(cmdline, "si"),
                 getOptionalBooleanArg(cmdline, "a"),
+                getOptionalStringArg(cmdline, "la"),
                 getOptionalBooleanArg(cmdline, "mx"),
                 routingService,
                 statsReceiver,
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5c5b5af..e7974c7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -21,6 +21,7 @@
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.net.InetSocketAddressHelper;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.acl.AccessControlManager;
@@ -36,8 +37,14 @@
 import com.twitter.distributedlog.feature.AbstractFeatureProvider;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.rate.MovingAverageRate;
+import com.twitter.distributedlog.rate.MovingAverageRateFactory;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.LeastLoadPlacementPolicy;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
+import com.twitter.distributedlog.service.placement.PlacementPolicy;
+import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
 import com.twitter.distributedlog.service.stream.BulkWriteOp;
 import com.twitter.distributedlog.service.stream.DeleteOp;
 import com.twitter.distributedlog.service.stream.admin.CreateOp;
@@ -67,32 +74,19 @@
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteContext;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.rate.MovingAverageRateFactory;
-import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
+import com.twitter.util.Function;
 import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
-import com.twitter.util.Timer;
 import com.twitter.util.ScheduledThreadPoolTimer;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
+import com.twitter.util.Timer;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -102,6 +96,17 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.runtime.BoxedUnit;
+
 public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
                                                   FatalErrorHandler {
 
@@ -113,6 +118,7 @@
     private final DistributedLogConfiguration dlConfig;
     private final DistributedLogNamespace dlNamespace;
     private final int serverRegionId;
+    private final PlacementPolicy placementPolicy;
     private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
     private final ReentrantReadWriteLock closeLock =
             new ReentrantReadWriteLock();
@@ -157,6 +163,7 @@
     private final Gauge<Number> movingAvgBpsGauge;
     private final Gauge<Number> streamAcquiredGauge;
     private final Gauge<Number> streamCachedGauge;
+    private final int shard;
 
     DistributedLogServiceImpl(ServerConfiguration serverConf,
                               DistributedLogConfiguration dlConf,
@@ -167,7 +174,8 @@
                               RoutingService routingService,
                               StatsLogger statsLogger,
                               StatsLogger perStreamStatsLogger,
-                              CountDownLatch keepAliveLatch)
+                              CountDownLatch keepAliveLatch,
+                              LoadAppraiser loadAppraiser)
             throws IOException {
         // Configuration.
         this.serverConfig = serverConf;
@@ -177,7 +185,7 @@
         this.serverRegionId = serverConf.getRegionId();
         this.streamPartitionConverter = converter;
         int serverPort = serverConf.getServerPort();
-        int shard = serverConf.getServerShardId();
+        this.shard = serverConf.getServerShardId();
         int numThreads = serverConf.getServerThreads();
         this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
         String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
@@ -264,6 +272,15 @@
                 streamManager,
                 limiterDisabledFeature);
 
+        this.placementPolicy = new LeastLoadPlacementPolicy(
+            loadAppraiser,
+            routingService,
+            dlNamespace,
+            new ZKPlacementStateManager(uri, dlConf, statsLogger),
+            Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
+            statsLogger);
+        logger.info("placement started");
+
         // Stats
         this.statsLogger = statsLogger;
 
@@ -501,35 +518,13 @@
             return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
         }
 
-        Stream stream = streamManager.getStream(streamName);
-        String owner;
-        if (null != stream && null != (owner = stream.getOwner())) {
-            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner)));
-        }
-
-        RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver);
-
-        if (ctx.isSetTriedHosts()) {
-            for (String triedHost : ctx.getTriedHosts()) {
-                routingContext.addTriedHost(
-                        DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE);
+        return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
+            @Override
+            public WriteResponse apply(String server) {
+                String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
+                return new WriteResponse(ResponseUtils.ownerToHeader(host));
             }
-        }
-
-        try {
-            SocketAddress host = routingService.getHost(streamName, routingContext);
-            if (host instanceof InetSocketAddress) {
-                // use shard id '-1' as the shard id here won't be used for redirection
-                return Future.value(new WriteResponse(
-                        ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, -1))));
-            } else {
-                return Future.value(new WriteResponse(
-                        ResponseUtils.streamUnavailableHeader()));
-            }
-        } catch (NoBrokersAvailableException e) {
-            return Future.value(new WriteResponse(
-                    ResponseUtils.streamUnavailableHeader()));
-        }
+        });
     }
 
 
@@ -689,6 +684,7 @@
 
             // Stop the timer.
             timer.stop();
+            placementPolicy.close();
 
             // clean up gauge
             unregisterGauge();
@@ -704,6 +700,10 @@
         }
     }
 
+    protected void startPlacementPolicy() {
+        this.placementPolicy.start(shard == 0);
+    }
+
     @Override
     public void notifyFatalError() {
         triggerShutdown();
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 9a9e83c..5b19f6c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -95,6 +95,9 @@
     protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
         = "server_use_hostname_as_allocator_pool_name";
     protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
+    //Configure refresh interval for calculating resource placement in seconds
+    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+    public static final int  SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
 
     public ServerConfiguration() {
         super();
@@ -399,6 +402,15 @@
             SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
     }
 
+    public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) {
+        setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs);
+        return this;
+    }
+
+    public int getResourcePlacementRefreshInterval() {
+        return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
+    }
+
     /**
      * Validate the configuration
      */
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
new file mode 100644
index 0000000..144e358
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+/**
+ * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * that they are endowed by their creator with certain unalienable loads, that among these are
+ * Uno, Eins, and One.
+ */
+public class EqualLoadAppraiser implements LoadAppraiser {
+  @Override
+  public Future<StreamLoad> getStreamLoad(String stream) {
+    return Future.value(new StreamLoad(stream, 1));
+  }
+
+  @Override
+  public Future<Void> refreshCache() {
+    return Future.value(null);
+  }
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..e4c8128
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.Function1;
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.Stats;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+
+/**
+ * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
+ * the load of a server would be. This placement policy then distributes these streams across the
+ * servers.
+ */
+public class LeastLoadPlacementPolicy extends PlacementPolicy {
+  private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+  private Map<String, String> streamToServer = new HashMap<String, String>();
+
+  public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                                  DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                                  Duration refreshInterval, StatsLogger statsLogger) {
+    super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+    statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+      @Override
+      public Number getDefaultValue() {
+        return 0;
+      }
+
+      @Override
+      public Number getSample() {
+        if (serverLoads.size() > 0) {
+          return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+        } else {
+          return getDefaultValue();
+        }
+      }
+    });
+  }
+
+  @Override
+  public Future<String> placeStream(String stream) {
+    if (streamToServer.containsKey(stream)) {
+      return Future.value(streamToServer.get(stream));
+    }
+    Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+    return streamLoadFuture.map(new Function<StreamLoad, String>() {
+      @Override
+      public String apply(StreamLoad streamLoad) {
+        return placeStreamSynchronized(streamLoad);
+      }
+    });
+  }
+
+  synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
+    ServerLoad serverLoad = serverLoads.pollFirst();
+    serverLoad.addStream(streamLoad);
+    serverLoads.add(serverLoad);
+    return serverLoad.getServer();
+  }
+
+  @Override
+  public void refresh() {
+    logger.info("Refreshing server loads.");
+    Future<Void> refresh = loadAppraiser.refreshCache();
+    final Set<String> servers = getServers();
+    final Set<String> allStreams = getStreams();
+    Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
+      @Override
+      public Future<TreeSet<ServerLoad>> apply(Void v1) {
+        return calculate(servers, allStreams);
+      }
+    });
+    serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+        try {
+          updateServerLoads(serverLoads);
+        } catch (PlacementStateManager.StateManagerSaveException e) {
+          logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+        }
+        return BoxedUnit.UNIT;
+      }
+    });
+  }
+
+  synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
+    this.placementStateManager.saveOwnership(serverLoads);
+    this.streamToServer = serverLoadsToMap(serverLoads);
+    this.serverLoads = serverLoads;
+  }
+
+  @Override
+  synchronized public void load(TreeSet<ServerLoad> serverLoads) {
+    this.serverLoads = serverLoads;
+    this.streamToServer = serverLoadsToMap(serverLoads);
+  }
+
+  public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+    logger.info("Calculating server loads");
+    final long startTime = System.currentTimeMillis();
+    ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+    for (String stream: streams) {
+      Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+      futures.add(streamLoad);
+    }
+
+    return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+      @Override
+      public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+        /* Sort streamLoads so largest streams are placed first for better balance */
+        TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+        for (StreamLoad streamLoad: streamLoads) {
+          streamQueue.add(streamLoad);
+        }
+
+        TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+        for (String server: servers) {
+          ServerLoad serverLoad = new ServerLoad(server);
+          if (!streamQueue.isEmpty()) {
+            serverLoad.addStream(streamQueue.pollFirst());
+          }
+          serverLoads.add(serverLoad);
+        }
+
+        while (!streamQueue.isEmpty()) {
+          ServerLoad serverLoad = serverLoads.pollFirst();
+          serverLoad.addStream(streamQueue.pollFirst());
+          serverLoads.add(serverLoad);
+        }
+        return serverLoads;
+      }
+    }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+        placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+        return BoxedUnit.UNIT;
+      }
+    }).onFailure(new Function<Throwable, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(Throwable t) {
+        logger.error("Failure calculating loads", t);
+        placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+        return BoxedUnit.UNIT;
+      }
+    });
+  }
+
+  private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+    HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+    for (ServerLoad serverLoad: serverLoads) {
+      for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
+        streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+      }
+    }
+    return streamToServer;
+  }
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
new file mode 100644
index 0000000..784f106
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+public interface LoadAppraiser {
+  Future<StreamLoad> getStreamLoad(String stream);
+  Future<Void> refreshCache();
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
new file mode 100644
index 0000000..2044428
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+
+/**
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
+ * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * then distributed these StreamLoads to the available servers in a manner defined by the
+ * implementation creating ServerLoad objects. It then saves this assignment via the
+ * PlacementStateManager.
+ */
+public abstract class PlacementPolicy {
+  protected final LoadAppraiser loadAppraiser;
+  protected final RoutingService routingService;
+  protected final DistributedLogNamespace namespace;
+  protected final PlacementStateManager placementStateManager;
+  private final Duration refreshInterval;
+
+  protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
+  protected final OpStatsLogger placementCalcStats;
+  private Timer placementRefreshTimer;
+
+  public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                         DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                         Duration refreshInterval, StatsLogger statsLogger) {
+    this.loadAppraiser = loadAppraiser;
+    this.routingService = routingService;
+    this.namespace = namespace;
+    this.placementStateManager = placementStateManager;
+    this.refreshInterval = refreshInterval;
+    placementCalcStats = statsLogger.getOpStatsLogger("placement");
+  }
+
+  public Set<String> getServers() {
+    Set<SocketAddress> hosts = routingService.getHosts();
+    Set<String> servers = new HashSet<String>(hosts.size());
+    for (SocketAddress address: hosts) {
+      servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+    }
+    return servers;
+  }
+
+  public Set<String> getStreams() {
+    Set<String> streams = new HashSet<String>();
+    try {
+      Iterator<String> logs = namespace.getLogs();
+      while (logs.hasNext()) {
+        streams.add(logs.next());
+      }
+    } catch (IOException e) {
+      logger.error("Could not get streams for placement policy.", e);
+    }
+    return streams;
+  }
+
+  public void start(boolean leader) {
+    logger.info("Starting placement policy");
+
+    TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+    for (String server: getServers()) {
+      emptyServerLoads.add(new ServerLoad(server));
+    }
+    load(emptyServerLoads); //Pre-Load so streams don't NPE
+    if (leader) { //this is the leader shard
+      logger.info("Shard is leader. Scheduling timed refresh.");
+      placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+      placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+        @Override
+        public BoxedUnit apply() {
+          refresh();
+          return BoxedUnit.UNIT;
+        }
+      });
+    } else {
+      logger.info("Shard is not leader. Watching for server load changes.");
+      placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+        @Override
+        public void callback(TreeSet<ServerLoad> serverLoads) {
+          if (!serverLoads.isEmpty()) {
+            load(serverLoads);
+          }
+        }
+      });
+    }
+  }
+
+  public void close() {
+    if (placementRefreshTimer != null) {
+      placementRefreshTimer.stop();
+    }
+  }
+
+  /**
+   * Places the stream on a server according to the policy and returns a future contianing the
+   * host that owns the stream upon completion
+   */
+  public abstract Future<String> placeStream(String stream);
+
+  /**
+   * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
+   */
+  public abstract void refresh();
+
+  /**
+   * Loads the placement mapping into the node from a TreeSet of ServerLoads
+   */
+  public abstract void load(TreeSet<ServerLoad> serverLoads);
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
new file mode 100644
index 0000000..cd0d906
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.TreeSet;
+
+/**
+ * The PlacementStateManager handles persistence of calculated resource placements including, the
+ * storage once the calculated, and the retrieval by the other shards.
+ */
+public interface PlacementStateManager {
+
+  /**
+   * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
+   */
+  void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+
+  /**
+   * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
+   */
+  TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+
+  /**
+   * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
+   * with the new mapping when a change occurs
+   */
+  void watch(PlacementCallback placementCallback);
+
+  interface PlacementCallback {
+    void callback(TreeSet<ServerLoad> serverLoads);
+  }
+
+  abstract class StateManagerException extends Exception {
+    public StateManagerException(String message, Exception e) {
+      super(message, e);
+    }
+  }
+
+  class StateManagerLoadException extends StateManagerException {
+    public StateManagerLoadException(Exception e) {
+      super("Load of Ownership failed", e);
+    }
+  }
+
+  class StateManagerSaveException extends StateManagerException {
+    public StateManagerSaveException(Exception e) {
+      super("Save of Ownership failed", e);
+    }
+  }
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
new file mode 100644
index 0000000..d7fbcf2
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import com.twitter.distributedlog.service.placement.thrift.*;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the server, total appraised load on the
+ * server, and all streams assigned to the server by the resource placement mapping. This is
+ * comparable first by load and then by server so that a sorted data structure of these will be
+ * consistent across multiple calculations.
+ */
+public class ServerLoad implements Comparable {
+  private static final int BUFFER_SIZE = 4096000;
+  private final String server;
+  private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+  private long load = 0l;
+
+  public ServerLoad(String server) {
+    this.server = server;
+  }
+
+  synchronized public long addStream(StreamLoad stream) {
+    this.load += stream.getLoad();
+    streamLoads.add(stream);
+    return this.load;
+  }
+
+  synchronized public long removeStream(String stream) {
+    for (StreamLoad streamLoad : streamLoads) {
+      if (streamLoad.stream.equals(stream)) {
+        this.load -= load;
+        streamLoads.remove(streamLoad);
+        return this.load;
+      }
+    }
+    return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+  }
+
+  public long getLoad() {
+    return load;
+  }
+
+  public Set<StreamLoad> getStreamLoads() {
+    return streamLoads;
+  }
+
+  public String getServer() {
+    return server;
+  }
+
+  protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+    tServerLoad.setServer(server);
+    tServerLoad.setLoad(load);
+    ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
+        = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+    for (StreamLoad streamLoad: streamLoads) {
+      tStreamLoads.add(streamLoad.toThrift());
+    }
+    tServerLoad.setStreams(tStreamLoads);
+    return tServerLoad;
+  }
+
+  public byte[] serialize() throws IOException {
+    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      toThrift().write(protocol);
+      transport.flush();
+      return transport.toString(UTF_8.name()).getBytes(UTF_8);
+    } catch (TException e) {
+      throw new IOException("Failed to serialize server load : ", e);
+    } catch (UnsupportedEncodingException uee) {
+      throw new IOException("Failed to serialize server load : ", uee);
+    }
+  }
+
+  public static ServerLoad deserialize(byte[] data) throws IOException {
+    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+    TMemoryInputTransport transport = new TMemoryInputTransport(data);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tServerLoad.read(protocol);
+      ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+      if (tServerLoad.isSetStreams()) {
+        for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
+          serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+        }
+      }
+      return serverLoad;
+    } catch (TException e) {
+      throw new IOException("Failed to deserialize server load : ", e);
+    }
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    ServerLoad other = (ServerLoad) o;
+    if (load == other.load) {
+      return server.compareTo(other.getServer());
+    } else {
+      return Long.compare(load, other.getLoad());
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    ServerLoad other = (ServerLoad) o;
+    return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
+  }
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
new file mode 100644
index 0000000..4f3dc71
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the stream and the appraised load produced
+ * by the stream.
+ */
+public class StreamLoad implements Comparable {
+  private static final int BUFFER_SIZE = 4096;
+  public final String stream;
+  private final int load;
+
+  public StreamLoad(String stream, int load) {
+    this.stream = stream;
+    this.load = load;
+  }
+
+  public int getLoad() {
+    return load;
+  }
+
+  public String getStream() {
+    return stream;
+  }
+
+  protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+    return tStreamLoad.setStream(stream).setLoad(load);
+  }
+
+  public byte[] serialize() throws IOException {
+    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      toThrift().write(protocol);
+      transport.flush();
+      return transport.toString(UTF_8.name()).getBytes(UTF_8);
+    } catch (TException e) {
+      throw new IOException("Failed to serialize stream load : ", e);
+    } catch (UnsupportedEncodingException uee) {
+      throw new IOException("Failed to serialize stream load : ", uee);
+    }
+  }
+
+  public static StreamLoad deserialize(byte[] data) throws IOException {
+    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+    TMemoryInputTransport transport = new TMemoryInputTransport(data);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tStreamLoad.read(protocol);
+      return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+    } catch (TException e) {
+      throw new IOException("Failed to deserialize stream load : ", e);
+    }
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    StreamLoad other = (StreamLoad) o;
+    if (load == other.getLoad()) {
+      return stream.compareTo(other.getStream());
+    } else {
+      return Long.compare(load, other.getLoad());
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    StreamLoad other = (StreamLoad) o;
+    return stream.equals(other.getStream()) && load == other.getLoad();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(stream).append(load).build();
+  }
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
new file mode 100644
index 0000000..18b9d1f
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.BKDistributedLogNamespace;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.Utils;
+
+/**
+ * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
+ * avoid necessitating an additional system for the resource placement.
+ */
+public class ZKPlacementStateManager implements PlacementStateManager {
+  static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
+  private static final String SERVER_LOAD_DIR = "/.server-load";
+
+  private final String serverLoadPath;
+  private final ZooKeeperClient zkClient;
+
+  private boolean watching = false;
+
+  public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+    zkClient = BKDistributedLogNamespace.createDLZKClientBuilder(
+        String.format("dlzk:%s:factory_writer_shared", uri),
+        conf,
+        DLUtils.getZKServersFromDLUri(uri),
+        statsLogger.scope("dlzk_factory_writer_shared")).build();
+    serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
+  }
+
+  private void createServerLoadPathIfNoExists(byte[] data)
+        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+    try {
+      Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+    } catch (KeeperException.NodeExistsException nee) {
+      logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+    }
+  }
+
+  @Override
+  public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+    logger.info("saving ownership");
+    try {
+      ZooKeeper zk = zkClient.get();
+      // use timestamp as data so watchers will see any changes
+      byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+      if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+        createServerLoadPathIfNoExists(timestamp);
+      }
+
+      Transaction tx = zk.transaction();
+      List<String> children = zk.getChildren(serverLoadPath, false);
+      HashSet<String> servers = new HashSet<String>(children);
+      tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+      for (ServerLoad serverLoad : serverLoads) {
+        String server = serverToZkFormat(serverLoad.getServer());
+        String serverPath = serverPath(server);
+        if (servers.contains(server)) {
+          servers.remove(server);
+          tx.setData(serverPath, serverLoad.serialize(), -1);
+        } else {
+          tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+        }
+      }
+      for (String server : servers) {
+        tx.delete(serverPath(server), -1);
+      }
+      tx.commit();
+    } catch (InterruptedException | IOException | KeeperException e) {
+      throw new StateManagerSaveException(e);
+    }
+  }
+
+  @Override
+  public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    try {
+      ZooKeeper zk = zkClient.get();
+      List<String> children = zk.getChildren(serverLoadPath, false);
+      for (String server : children) {
+        ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+      }
+      return ownerships;
+    } catch (InterruptedException | IOException | KeeperException e) {
+      throw new StateManagerLoadException(e);
+    }
+  }
+
+  @Override
+  synchronized public void watch(final PlacementCallback callback) {
+    if (watching) {
+      return; // do not double watch
+    }
+    watching = true;
+
+    try {
+      ZooKeeper zk = zkClient.get();
+      try {
+        zk.getData(serverLoadPath, new Watcher() {
+          @Override
+          public void process(WatchedEvent watchedEvent) {
+            try {
+              callback.callback(loadOwnership());
+            } catch (StateManagerLoadException e) {
+              logger.error("Watch of Ownership failed", e);
+            } finally {
+              watching = false;
+              watch(callback);
+            }
+          }
+        }, new Stat());
+      } catch (KeeperException.NoNodeException nee) {
+        byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+        createServerLoadPathIfNoExists(timestamp);
+        watching = false;
+        watch(callback);
+      }
+    } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+      logger.error("Watch of Ownership failed", e);
+      watching = false;
+      watch(callback);
+    }
+  }
+
+  public String serverPath(String server) {
+    return String.format("%s/%s", serverLoadPath, server);
+  }
+
+  protected String serverToZkFormat(String server) {
+    return server.replaceAll("/", "--");
+  }
+
+  protected String zkFormatToServer(String zkFormattedServer) {
+    return zkFormattedServer.replaceAll("--", "/");
+  }
+}
diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift
new file mode 100644
index 0000000..8f7b6ec
--- /dev/null
+++ b/distributedlog-service/src/main/thrift/metadata.thrift
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace java com.twitter.distributedlog.service.placement.thrift
+
+struct StreamLoad {
+    1: optional string stream
+    2: optional i32 load
+}
+
+struct ServerLoad {
+    1: optional string server
+    2: optional i64 load
+    3: optional list<StreamLoad> streams
+}
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index d7a0ba6..1bfe352 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -28,6 +28,7 @@
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.stream.WriteOp;
 import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
 import com.twitter.distributedlog.service.stream.StreamImpl;
@@ -140,16 +141,17 @@
             converter = new IdentityStreamPartitionConverter();
         }
         return new DistributedLogServiceImpl(
-                serverConf,
-                dlConf,
-                ConfUtils.getConstDynConf(dlConf),
-                new NullStreamConfigProvider(),
-                uri,
-                converter,
-                new LocalRoutingService(),
-                NullStatsLogger.INSTANCE,
-                NullStatsLogger.INSTANCE,
-                latch);
+            serverConf,
+            dlConf,
+            ConfUtils.getConstDynConf(dlConf),
+            new NullStreamConfigProvider(),
+            uri,
+            converter,
+            new LocalRoutingService(),
+            NullStatsLogger.INSTANCE,
+            NullStatsLogger.INSTANCE,
+            latch,
+            new EqualLoadAppraiser());
     }
 
     private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
@@ -777,21 +779,21 @@
                 .addHost("stream-0", service.getServiceAddress().getSocketAddress())
                 .setAllowRetrySameHost(false);
 
-        // routing service doesn't know 'stream-1'
+        service.startPlacementPolicy();
+
         WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
 
-        // service cache "stream-2" but not acquire
+        // service cache "stream-2"
         StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false);
-        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
-
         // create write ops to stream-2 to make service acquire the stream
         WriteOp op = createWriteOp(service, "stream-2", 0L);
         stream.submit(op);
         stream.start();
         WriteResponse wr = Await.result(op.result());
-        assertEquals("Op  should succeed",
+        assertEquals("Op should succeed",
                 StatusCode.SUCCESS, wr.getHeader().getCode());
         assertEquals("Service should acquire stream",
                 StreamStatus.INITIALIZED, stream.getStatus());
@@ -804,18 +806,6 @@
         assertEquals(StatusCode.FOUND, response.getHeader().getCode());
         assertEquals(service.getServiceAddress().toString(),
                 response.getHeader().getLocation());
-
-        // find the stream from the routing service
-        response = FutureUtils.result(service.getOwner("stream-0", new WriteContext()));
-        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
-        assertEquals(service.getServiceAddress().toString(),
-                response.getHeader().getLocation());
-
-        // add the tried host
-        WriteContext ctx = new WriteContext();
-        ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress()));
-        response = FutureUtils.result(service.getOwner("stream-0", ctx));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
     }
 
 }
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..ab4eeae
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.LinkedHashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestLeastLoadPlacementPolicy {
+
+  @Test
+  public void testCalculateBalances() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
+    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+    long lowLoadPerServer = numStreams / numSevers;
+    long highLoadPerServer = lowLoadPerServer + 1;
+    for (ServerLoad serverLoad: serverLoads) {
+      long load = serverLoad.getLoad();
+      assertEquals(load, serverLoad.getStreamLoads().size());
+      assertTrue(String.format("Load %d is not between %d and %d", load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+    }
+  }
+
+  @Test
+  public void testRefreshAndPlaceStream() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    try {
+      when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
+    } catch (IOException e) {
+      fail();
+    }
+    PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, mockPlacementStateManager, Duration.fromSeconds(600), new NullStatsLogger());
+    leastLoadPlacementPolicy.refresh();
+
+    final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
+    verify(mockPlacementStateManager).saveOwnership(captor.capture());
+    TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>)captor.getValue();
+    ServerLoad next = serverLoads.first();
+    String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
+    assertEquals(next.getServer(), serverPlacement);
+  }
+
+  @Test
+  public void testCalculateUnequalWeight() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    /* use AtomicInteger to have a final object in answer method */
+    final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
+    when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
+      @Override
+      public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
+        int load = new Random().nextInt(100000);
+        if (load > maxLoad.get()) {
+          maxLoad.set(load);
+        }
+        return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
+      }
+    });
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        mockLoadAppraiser, mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
+    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+    long highestLoadSeen = Long.MIN_VALUE;
+    long lowestLoadSeen = Long.MAX_VALUE;
+    for (ServerLoad serverLoad: serverLoads) {
+      long load = serverLoad.getLoad();
+      if (load < lowestLoadSeen) {
+        lowestLoadSeen = load;
+      }
+      if (load > highestLoadSeen) {
+        highestLoadSeen = load;
+      }
+    }
+    assertTrue(highestLoadSeen - lowestLoadSeen < maxLoad.get());
+  }
+
+  private Set<SocketAddress> generateSocketAddresses(int num) {
+    LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
+    for (int i = 0; i < num; i++) {
+      socketAddresses.add(new InetSocketAddress(i));
+    }
+    return socketAddresses;
+  }
+
+  private Set<String> generateStreams(int num) {
+    LinkedHashSet<String> streams = new LinkedHashSet<String>();
+    for (int i = 0; i < num; i++) {
+      streams.add("stream_" + i);
+    }
+    return streams;
+  }
+
+  private Set<String> generateServers(int num) {
+    LinkedHashSet<String> servers = new LinkedHashSet<String>();
+    for (int i = 0; i < num; i++) {
+      servers.add("server_" + i);
+    }
+    return servers;
+  }
+}
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
new file mode 100644
index 0000000..bbd7e72
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestServerLoad {
+
+  @Test
+  public void testSerializeDeserialize() throws IOException {
+    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+    for (int i = 0; i < 20; i++) {
+      serverLoad.addStream(new StreamLoad("stream-"+i, i));
+    }
+    assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
+  }
+
+  @Test
+  public void testGetLoad() throws IOException {
+    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+    assertEquals(0, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+1, 3));
+    assertEquals(3, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+2, 7));
+    assertEquals(10, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+3, 1));
+    assertEquals(11, serverLoad.getLoad());
+  }
+}
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
new file mode 100644
index 0000000..3a3e5c0
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStreamLoad {
+
+  @Test
+  public void testSerializeDeserialize() throws IOException {
+    final String streamName = "aHellaRandomStreamName";
+    final int load = 1337;
+    final StreamLoad streamLoad = new StreamLoad(streamName, load);
+    assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
+  }
+}
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
new file mode 100644
index 0000000..b104952
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+
+public class TestZKPlacementStateManager {
+  private TestingServer zkTestServer;
+  private String zkServers;
+  private URI uri;
+  private ZKPlacementStateManager zkPlacementStateManager;
+
+  @Before
+  public void startZookeeper() throws Exception {
+    zkTestServer = new TestingServer(2181);
+    zkServers = "127.0.0.1:2181";
+    uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+    zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+  }
+
+  @Test
+  public void testSaveLoad() throws Exception {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    zkPlacementStateManager.saveOwnership(ownerships);
+    SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    ownerships.add(new ServerLoad("emptyServer"));
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    ServerLoad sl1 = new ServerLoad("server1");
+    sl1.addStream(new StreamLoad("stream1", 3));
+    sl1.addStream(new StreamLoad("stream2", 4));
+    ServerLoad sl2 = new ServerLoad("server2");
+    sl2.addStream(new StreamLoad("stream3", 1));
+    ownerships.add(sl1);
+    ownerships.add(sl2);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    loadedOwnerships.remove(sl1);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+  }
+
+  @Test
+  public void testWatchIndefinitely() throws Exception {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    ownerships.add(new ServerLoad("server1"));
+    PlacementStateManager.PlacementCallback callback = mock(PlacementStateManager.PlacementCallback.class);
+    zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
+    zkPlacementStateManager.watch(callback);
+    // cannot verify the callback here as it may call before the verify is called
+
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+
+    ServerLoad server2 = new ServerLoad("server2");
+    server2.addStream(new StreamLoad("hella-important-stream", 415));
+    ownerships.add(server2);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+
+    server2.removeStream("server1");
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+  }
+
+  @Test
+  public void testZkFormatting() throws Exception {
+    final String server = "smf1-eci-41-sr1.prod.twitter.com/10.70.186.139:31351";
+    final String zkFormattedServer = "smf1-eci-41-sr1.prod.twitter.com--10.70.186.139:31351";
+    URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+    ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+    assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
+    assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
+  }
+
+  @After
+  public void stopZookeeper() throws IOException {
+    zkTestServer.stop();
+  }
+}