Merge pull request #29 from twitter/sijie/start_stop_scripts

Add documents on running distributedlog in distributed mode
diff --git a/distributedlog-benchmark/bin/dbench b/distributedlog-benchmark/bin/dbench
new file mode 100755
index 0000000..667c6a5
--- /dev/null
+++ b/distributedlog-benchmark/bin/dbench
@@ -0,0 +1,204 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# check if net.ipv6.bindv6only is set to 1
+bindv6only=$(/sbin/sysctl -n net.ipv6.bindv6only 2> /dev/null)
+if [ -n "$bindv6only" ] && [ "$bindv6only" -eq "1" ]
+then
+  echo "Error: \"net.ipv6.bindv6only\" is set to 1 - Java networking could be broken"
+  echo "For more info (the following page also applies to dlog): http://wiki.apache.org/hadoop/HadoopIPv6"
+  exit 1
+fi
+
+# See the following page for extensive details on setting
+# up the JVM to accept JMX remote management:
+# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
+# by default we allow local JMX connections
+if [ "x$JMXLOCALONLY" = "x" ]
+then
+    JMXLOCALONLY=false
+fi
+
+if [ "x$JMXDISABLE" = "x" ]
+then
+    echo "JMX enabled by default" >&2
+    # for some reason these two options are necessary on jdk6 on Ubuntu
+    #   accord to the docs they are not necessary, but otw jconsole cannot
+    #   do a local attach
+    JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY"
+else
+    echo "JMX disabled by user request" >&2
+fi
+
+BINDIR=`dirname "$0"`
+DLOG_HOME=`cd $BINDIR/.. > /dev/null;pwd`
+
+DEFAULT_LOG_CONF=$DLOG_HOME/conf/log4j.properties
+
+source $DLOG_HOME/conf/dlogenv.sh
+
+# exclude tests jar
+RELEASE_JAR=`ls $DLOG_HOME/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1`
+if [ $? == 0 ]; then
+    DLOG_JAR=$RELEASE_JAR
+fi
+
+# exclude tests jar
+BUILT_JAR=`ls $DLOG_HOME/target/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1`
+if [ $? != 0 ] && [ ! -e "$DLOG_JAR" ]; then
+    echo "\nCouldn't find dlog jar.";
+    echo "Make sure you've run 'mvn package'\n";
+    exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+    DLOG_JAR=$BUILT_JAR
+fi
+
+dbench_help() {
+    cat <<EOF
+Usage: dlog <command>
+where command is one of:
+    bkwrite             Benchmark bookkeeper using distributedlog core library
+    write               Benchmark distributedlog write proxy using thin client
+    read                Benchmark distributedlog read using distributedlog core library
+
+or command is the full name of a class with a defined main() method.
+
+Environment variables:
+   DLOG_LOG_CONF        Log4j configuration file (default $DEFAULT_LOG_CONF)
+   DLOG_EXTRA_OPTS      Extra options to be passed to the jvm
+   DLOG_EXTRA_CLASSPATH Add extra paths to the dlog classpath
+
+These variable can also be set in conf/dlogenv.sh
+EOF
+}
+
+add_maven_deps_to_classpath() {
+    MVN="mvn"
+    if [ "$MAVEN_HOME" != "" ]; then
+	MVN=${MAVEN_HOME}/bin/mvn
+    fi
+
+    # Need to generate classpath from maven pom. This is costly so generate it
+    # and cache it. Save the file into our target dir so a mvn clean will get
+    # clean it up and force us create a new one.
+    f="${DLOG_HOME}/target/cached_classpath.txt"
+    if [ ! -f "${f}" ]
+    then
+	${MVN} -f "${DLOG_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
+    fi
+    DLOG_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+
+if [ -d "$DLOG_HOME/lib" ]; then
+    for i in $DLOG_HOME/lib/*.jar; do
+	DLOG_CLASSPATH=$DLOG_CLASSPATH:$i
+    done
+else
+    add_maven_deps_to_classpath
+fi
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    dbench_help;
+    exit 1;
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+if [ -z "$DLOG_LOG_CONF" ]; then
+    DLOG_LOG_CONF=$DEFAULT_LOG_CONF
+fi
+
+DLOG_CLASSPATH="$DLOG_JAR:$DLOG_CLASSPATH:$DLOG_EXTRA_CLASSPATH"
+if [ "$DLOG_LOG_CONF" != "" ]; then
+    DLOG_CLASSPATH="`dirname $DLOG_LOG_CONF`:$DLOG_CLASSPATH"
+    OPTS="$OPTS -Dlog4j.configuration=`basename $DLOG_LOG_CONF`"
+fi
+OPTS="-cp $DLOG_CLASSPATH $OPTS $DLOG_EXTRA_OPTS"
+
+OPTS="$OPTS $DLOG_EXTRA_OPTS"
+
+# Disable ipv6 as it can cause issues
+OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
+
+# log directory & file
+DLOG_ROOT_LOGGER=${DLOG_ROOT_LOGGER:-"INFO,R"}
+DLOG_LOG_DIR=${DLOG_LOG_DIR:-"$DLOG_HOME/logs"}
+DLOG_LOG_FILE=${DLOG_LOG_FILE:-"dbench.log"}
+
+#Configure log configuration system properties
+OPTS="$OPTS -Ddlog.root.logger=$DLOG_ROOT_LOGGER"
+OPTS="$OPTS -Ddlog.log.dir=$DLOG_LOG_DIR"
+OPTS="$OPTS -Ddlog.log.file=$DLOG_LOG_FILE"
+
+BENCH_ARGS="""
+  --provider ${STATS_PROVIDER} \\
+  --conf ${BENCH_CONF_FILE:-"${DLOG_HOME}/conf/benchmark.conf"} \\
+  --streamprefix ${STREAM_NAME_PREFIX} \\
+  --duration ${BENCHMARK_DURATION} \\
+  --shard ${BENCHMARK_SHARD_ID} \\
+  --uri ${DL_NAMESPACE} \\
+  --streamcount ${NUM_STREAMS} \\
+  --thriftmux \\
+  --handshake-with-client-info \\
+  --concurrency 1
+"""
+
+#Change to DLOG_HOME to support relative paths
+cd "$DLOG_HOME"
+if [ $COMMAND == "bkwrite" ]; then
+    BENCH_WRITE_ARGS="""
+    --messagesize ${MSG_SIZE} \\
+    --rate ${INITIAL_RATE} \\
+    --max-rate ${MAX_RATE} \\
+    --change-rate ${CHANGE_RATE} \\
+    --change-interval ${CHANGE_RATE_INTERVAL}
+    """
+    BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode dlwrite \\"
+    exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
+elif [ $COMMAND == "write" ]; then
+    BENCH_WRITE_ARGS="""
+    --messagesize ${MSG_SIZE} \\
+    --rate ${INITIAL_RATE} \\
+    --max-rate ${MAX_RATE} \\
+    --change-rate ${CHANGE_RATE} \\
+    --change-interval ${CHANGE_RATE_INTERVAL} \\
+    --finagle-name ${DL_WP_FINAGLE_NAME}
+    """
+    BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode write \\"
+    exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
+elif [ $COMMAND == "read" ]; then
+    BENCH_READ_ARGS="""
+    --readers-per-stream ${NUM_READERS_PER_STREAM} \\
+    --max-stream-id ${MAX_STREAM_ID} \\
+    --truncation-interval ${TRUNCATION_INTERVAL} \\
+    --finagle-name ${DL_WP_FINAGLE_NAME}
+    """
+    BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_READ_ARGS} \\ --mode read \\"
+    exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
+elif [ $COMMAND == "help" ]; then
+    dbench_help;
+else
+    exec java $OPTS $COMMAND $@
+fi
+
+
diff --git a/distributedlog-benchmark/conf/benchmark.conf b/distributedlog-benchmark/conf/benchmark.conf
new file mode 100644
index 0000000..9e9c8f8
--- /dev/null
+++ b/distributedlog-benchmark/conf/benchmark.conf
@@ -0,0 +1,81 @@
+########################
+# ZooKeeper Client Settings
+########################
+
+# zookeeper settings
+zkSessionTimeoutSeconds=60
+zkNumRetries=100
+zkRetryStartBackoffMillis=100
+zkRetryMaxBackoffMillis=200
+# bkc zookeeper settings
+bkcZKSessionTimeoutSeconds=60
+bkcZKNumRetries=100
+bkcZKRetryStartBackoffMillis=100
+bkcZKRetryMaxBackoffMillis=200
+
+########################
+# BookKeeper Client Settings
+########################
+
+# bookkeeper client timeouts
+bkcReadTimeoutSeconds=20
+bkcNumWorkerThreads=16
+bkc.numChannelsPerBookie=1
+bkc.enableTaskExecutionStats=true
+bkc.connectTimeoutMillis=200
+
+### Readers
+bkc.firstSpeculativeReadTimeout = 10
+bkc.maxSpeculativeReadTimeout = 30
+bkc.firstSpeculativeReadLACTimeout = 10
+bkc.maxSpeculativeReadLACTimeout = 2000
+bkc.ensemblePlacementPolicy=org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
+ReadAheadMaxEntries = 1000
+ReadAheadBatchSize = 4
+numWorkerThreads = 16
+readLACLongPollTimeout = 10000
+traceReadAheadDeliveryLatency = true
+traceReadAheadMetadataChanges = true
+readTimeout = 20
+
+### Writers
+
+# Metadata Settings
+# ledger metadata version that supports sequence id
+ledger-metadata-layout=5
+# use allocator pool for proxy
+enableLedgerAllocatorPool=false
+# check stream exists or not
+createStreamIfNotExists=true
+# encode dc id in version
+encodeDCIDInVersion=true
+
+### Write Performance Related Settings
+
+# ensemble settings
+ensemble-size=3
+write-quorum-size=3
+ack-quorum-size=2
+bkc.ensemblePlacementPolicy=org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
+
+# flush policies
+enableImmediateFlush=false
+output-buffer-size=16000
+periodicFlushFrequencyMilliSeconds=10
+
+# retention policy
+retention-size=4
+# rolling ledgers (enable time rolling): 120 minutes = 2 hours
+rolling-interval=120
+# max logsegment bytes : 2GB
+maxLogSegmentBytes=2147483648
+# rolling concurrency
+logSegmentRollingConcurrency=1
+# disable sanityCheckDelete
+sanityCheckDelete=false
+# compression codec
+compressionType=lz4
+
+### Stats
+# Exporting codahale stats
+codahaleStatsHttpPort=10002
diff --git a/distributedlog-benchmark/conf/dlogenv.sh b/distributedlog-benchmark/conf/dlogenv.sh
index 4f92d5d..608294e 100644
--- a/distributedlog-benchmark/conf/dlogenv.sh
+++ b/distributedlog-benchmark/conf/dlogenv.sh
@@ -30,3 +30,63 @@
 
 # Add extra paths to the dlog classpath
 # DLOG_EXTRA_CLASSPATH=
+
+########################
+# Benchmark Arguments
+########################
+
+# Configuration File
+# BENCH_CONF_FILE=
+# Stats Provider
+STATS_PROVIDER=org.apache.bookkeeper.stats.CodahaleMetricsServletProvider
+# Stream Name Prefix
+STREAM_NAME_PREFIX=distributedlog-smoketest
+# Benchmark Run Duration in minutes
+BENCHMARK_DURATION=60
+# DL Namespace
+DL_NAMESPACE=distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace
+# Benchmark SHARD id
+BENCHMARK_SHARD_ID=0
+
+# How many streams
+NUM_STREAMS=100
+
+# Max stream id (exclusively)
+MAX_STREAM_ID=100
+
+#########
+# Writer
+#########
+
+# Start stream id
+START_STREAM_ID=0
+# End stream id (inclusively)
+END_STREAM_ID=99
+
+# Message Size
+MSG_SIZE=1024
+
+# Write Rate
+# Initial rate - messages per second
+INITIAL_RATE=1
+# Max rate - messages per second
+MAX_RATE=1000
+# Rate change each interval - messages per second
+CHANGE_RATE=100
+# Rate change interval, in seconds
+CHANGE_RATE_INTERVAL=300
+
+# DL Write Proxy Finagle Name
+DL_WP_FINAGLE_NAME='zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy'
+
+##########
+# Reader 
+##########
+
+### Reader could be run in a sharded way. Each shard is responsible for
+### reading a subset of the streams. A stream could be configured to be
+### read by multiple shards.
+NUM_READERS_PER_STREAM=1
+
+### Interval that reader issues truncate requests to truncate the streams, in seconds
+TRUNCATION_INTERVAL=600
diff --git a/distributedlog-benchmark/conf/log4j.properties b/distributedlog-benchmark/conf/log4j.properties
index cca2aa6..605049f 100644
--- a/distributedlog-benchmark/conf/log4j.properties
+++ b/distributedlog-benchmark/conf/log4j.properties
@@ -2,32 +2,36 @@
 # DistributedLog Logging Configuration
 #
 
-dlog.root.logger=INFO, stderr
-dlog.log.dir=.
-dlog.log.file=bookkeeper-server.log
+# Default values
+dlog.root.logger=INFO, R
+dlog.log.dir=logs
+dlog.log.file=dlog.log
 
-# Example with rolling log file
 log4j.rootLogger=${dlog.root.logger}
-
-#disable zookeeper logging
 log4j.logger.org.apache.zookeeper=INFO
-#Set the bookkeeper level to warning
 log4j.logger.org.apache.bookkeeper=INFO
 
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-#log4j.appender.ROLLINGFILE.Threshold=INFO
-#log4j.appender.ROLLINGFILE.File=distributedlog.log
-#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
-#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+# redirect executor output to executors.log since slow op warnings can be quite verbose
+log4j.logger.com.twitter.distributedlog.util.MonitoredFuturePool=INFO, Executors
+log4j.logger.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
+log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
+log4j.additivity.com.twitter.distributedlog.util.MonitoredFuturePool=false
+log4j.additivity.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
+log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
+
+log4j.appender.Executors=org.apache.log4j.RollingFileAppender
+log4j.appender.Executors.Threshold=INFO
+log4j.appender.Executors.File=${dlog.log.dir}/executors.log
+log4j.appender.Executors.MaxFileSize=20MB
+log4j.appender.Executors.MaxBackupIndex=5
+log4j.appender.Executors.layout=org.apache.log4j.PatternLayout
+log4j.appender.Executors.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
 
 log4j.appender.R=org.apache.log4j.RollingFileAppender
 log4j.appender.R.Threshold=INFO
 log4j.appender.R.File=${dlog.log.dir}/${dlog.log.file}
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.MaxFileSize=20MB
+log4j.appender.R.MaxBackupIndex=50
 log4j.appender.R.layout=org.apache.log4j.PatternLayout
 log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
 
diff --git a/distributedlog-benchmark/pom.xml b/distributedlog-benchmark/pom.xml
index 84cf424..0833fae 100644
--- a/distributedlog-benchmark/pom.xml
+++ b/distributedlog-benchmark/pom.xml
@@ -33,7 +33,7 @@
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
-      <artifactId>distributedlog-core</artifactId>
+      <artifactId>distributedlog-service</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
index 1e6a85e..44f3179 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
@@ -36,7 +36,9 @@
      * @return routing service builder
      */
     public static RoutingService.Builder buildRoutingService(String finagleNameStr) {
-        if (!finagleNameStr.startsWith("serverset!") && !finagleNameStr.startsWith("inet!")) {
+        if (!finagleNameStr.startsWith("serverset!")
+                && !finagleNameStr.startsWith("inet!")
+                && !finagleNameStr.startsWith("zk!")) {
             // We only support serverset based names at the moment
             throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr);
         }
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java
index b49f681..92c7c29 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java
@@ -18,12 +18,16 @@
 package com.twitter.distributedlog.client.serverset;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.common.zookeeper.ServerSets;
 import com.twitter.common.zookeeper.ZooKeeperClient;
+import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -33,19 +37,32 @@
  */
 public class DLZkServerSet {
 
+    private static final Logger logger = LoggerFactory.getLogger(DLZkServerSet.class);
+
     static final String ZNODE_WRITE_PROXY = ".write_proxy";
 
     private static String getZKServersFromDLUri(URI uri) {
         return uri.getAuthority().replace(";", ",");
     }
 
+    private static Iterable<InetSocketAddress> getZkAddresses(URI uri) {
+        String zkServers = getZKServersFromDLUri(uri);
+        String[] zkServerList = StringUtils.split(zkServers, ',');
+        ImmutableList.Builder<InetSocketAddress> builder = ImmutableList.builder();
+        for (String zkServer : zkServerList) {
+            HostAndPort hostAndPort = HostAndPort.fromString(zkServer).withDefaultPort(2181);
+            builder.add(InetSocketAddress.createUnresolved(
+                    hostAndPort.getHostText(),
+                    hostAndPort.getPort()));
+        }
+        return builder.build();
+    }
+
     public static DLZkServerSet of(URI uri,
                                    int zkSessionTimeoutMs) {
         // Create zookeeper and server set
-        String zkServers = getZKServersFromDLUri(uri);
         String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY;
-        Iterable<InetSocketAddress> zkAddresses =
-                ImmutableList.of(InetSocketAddress.createUnresolved(zkServers, 2181));
+        Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri);
         ZooKeeperClient zkClient =
                 new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, Time.MILLISECONDS), zkAddresses);
         ServerSet serverSet = ServerSets.create(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath);
diff --git a/distributedlog-core/conf/bookie.conf.template b/distributedlog-core/conf/bookie.conf.template
new file mode 100644
index 0000000..41e287c
--- /dev/null
+++ b/distributedlog-core/conf/bookie.conf.template
@@ -0,0 +1,185 @@
+#!/bin/sh
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+## Bookie settings
+
+# Port that bookie server listen on
+bookiePort=3181
+
+# TODO: change the journal directory
+# Directory Bookkeeper outputs its write ahead log
+journalDirectory=/tmp/data/bk/journal
+
+# TODO: change the ledgers directory
+# Directory Bookkeeper outputs ledger snapshots
+ledgerDirectories=/tmp/data/bk/ledgers
+
+# TODO: change the index directory
+# Directory in which index files will be stored.
+indexDirectories=/tmp/data/bk/ledgers
+
+# Ledger Manager Class
+# What kind of ledger manager is used to manage how ledgers are stored, managed
+# and garbage collected. Try to read 'BookKeeper Internals' for detail info.
+ledgerManagerType=hierarchical
+
+# Root zookeeper path to store ledger metadata
+# This parameter is used by zookeeper-based ledger manager as a root znode to
+# store all ledgers.
+zkLedgersRootPath=/messaging/bookkeeper/ledgers
+
+# Max file size of entry logger, in bytes
+# A new entry log file will be created when the old one reaches the file size limitation
+logSizeLimit=1073741823
+
+# Max file size of journal file, in mega bytes
+# A new journal file will be created when the old one reaches the file size limitation
+#
+journalMaxSizeMB=2048
+
+# Max number of old journal file to kept
+# Keep a number of old journal files would help data recovery in specia case
+#
+journalMaxBackups=5
+
+# How long the interval to trigger next garbage collection, in milliseconds
+# Since garbage collection is running in background, too frequent gc
+# will heart performance. It is better to give a higher number of gc
+# interval if there is enough disk capacity.
+# gc per 1 hour (aligning with most DL rolling interval)
+gcInitialWaitTime=600000
+gcWaitTime=3600000
+# do minor compaction per 2 hours
+minorCompactionInterval=7200
+minorCompactionThreshold=0.2
+# disable major compaction
+majorCompactionInterval=0
+# reduce major compaction threshold to a low value to prevent bad force compaction behavior
+majorCompactionThreshold=0.3
+# Compaction Rate & Max Outstanding
+compactionRate=10737418
+compactionMaxOutstandingRequests=10737418
+
+# How long the interval to flush ledger index pages to disk, in milliseconds
+# Flushing index files will introduce much random disk I/O.
+# If separating journal dir and ledger dirs each on different devices,
+# flushing would not affect performance. But if putting journal dir
+# and ledger dirs on same device, performance degrade significantly
+# on too frequent flushing. You can consider increment flush interval
+# to get better performance, but you need to pay more time on bookie
+# server restart after failure.
+#
+flushInterval=1000
+
+# Interval to watch whether bookie is dead or not, in milliseconds
+#
+# bookieDeathWatchInterval=1000
+
+## zookeeper client settings
+
+# A list of one of more servers on which zookeeper is running.
+# The server list can be comma separated values, for example:
+# zkServers=zk1:2181,zk2:2181,zk3:2181
+zkServers=localhost:2181
+
+# ZooKeeper client session timeout in milliseconds
+# Bookie server will exit if it received SESSION_EXPIRED because it
+# was partitioned off from ZooKeeper for more than the session timeout
+# JVM garbage collection, disk I/O will cause SESSION_EXPIRED.
+# Increment this value could help avoiding this issue
+zkTimeout=30000
+
+## NIO Server settings
+
+# This settings is used to enabled/disabled Nagle's algorithm, which is a means of
+# improving the efficiency of TCP/IP networks by reducing the number of packets
+# that need to be sent over the network.
+# If you are sending many small messages, such that more than one can fit in
+# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm
+# can provide better performance.
+# Default value is true.
+#
+serverTcpNoDelay=true
+
+## ledger cache settings
+
+# Max number of ledger index files could be opened in bookie server
+# If number of ledger index files reaches this limitation, bookie
+# server started to swap some ledgers from memory to disk.
+# Too frequent swap will affect performance. You can tune this number
+# to gain performance according your requirements.
+openFileLimit=20000
+
+# Size of a index page in ledger cache, in bytes
+# A larger index page can improve performance writing page to disk,
+# which is efficent when you have small number of ledgers and these
+# ledgers have similar number of entries.
+# If you have large number of ledgers and each ledger has fewer entries,
+# smaller index page would improve memory usage.
+pageSize=8192
+
+# How many index pages provided in ledger cache
+# If number of index pages reaches this limitation, bookie server
+# starts to swap some ledgers from memory to disk. You can increment
+# this value when you found swap became more frequent. But make sure
+# pageLimit*pageSize should not more than JVM max memory limitation,
+# otherwise you would got OutOfMemoryException.
+# In general, incrementing pageLimit, using smaller index page would
+# gain bettern performance in lager number of ledgers with fewer entries case
+# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute
+# the limitation of number of index pages.
+pageLimit=131072
+
+#If all ledger directories configured are full, then support only read requests for clients.
+#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted
+#to read-only mode and serve only read requests. Otherwise the bookie will be shutdown.
+readOnlyModeEnabled=true
+
+# Bookie Journal Settings
+writeBufferSizeBytes=262144
+journalFlushWhenQueueEmpty=false
+journalRemoveFromPageCache=true
+journalAdaptiveGroupWrites=true
+journalMaxGroupWaitMSec=4
+journalBufferedEntriesThreshold=180
+journalBufferedWritesThreshold=131072
+journalMaxGroupedEntriesToCommit=200
+journalPreAllocSizeMB=4
+
+# Sorted Ledger Storage Settings
+sortedLedgerStorageEnabled=true
+skipListSizeLimit=67108864
+skipListArenaChunkSize=2097152
+skipListArenaMaxAllocSize=131072
+fileInfoCacheInitialCapacity=10000
+fileInfoMaxIdleTime=3600
+
+# Bookie Threads Settings (NOTE: change this to align the cpu cores)
+numAddWorkerThreads=4
+numJournalCallbackThreads=4
+numReadWorkerThreads=4
+numLongPollWorkerThreads=4
+
+# stats
+statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsServletProvider
+# Exporting codahale stats
+codahaleStatsHttpPort=9001
diff --git a/distributedlog-core/conf/dlogenv.sh b/distributedlog-core/conf/dlogenv.sh
index 11e4024..ae6c459 100644
--- a/distributedlog-core/conf/dlogenv.sh
+++ b/distributedlog-core/conf/dlogenv.sh
@@ -20,7 +20,9 @@
 # * limitations under the License.
 # */
 
-# default settings for starting distributedlog sandbox
+##################
+# General
+##################
 
 # Log4j configuration file
 # DLOG_LOG_CONF=
@@ -39,3 +41,37 @@
 
 # Configure the log file
 # DLOG_LOG_FILE=
+
+#################
+# ZooKeeper
+#################
+
+# Configure zookeeper root logger
+# ZK_ROOT_LOGGER=
+
+#################
+# Bookie
+#################
+
+# Configure bookie root logger
+# BK_ROOT_LOGGER=
+
+#################
+# Write Proxy
+#################
+
+# Configure write proxy root logger
+# WP_ROOT_LOGGER=
+
+# write proxy configuration file
+# WP_CONF_FILE=${DL_HOME}/conf/write_proxy.conf
+
+# port and stats port
+# WP_SERVICE_PORT=4181
+# WP_STATS_PORT=9000
+
+# shard id
+# WP_SHARD_ID=0
+
+# write proxy namespace
+# WP_NAMESPACE=distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace
diff --git a/distributedlog-core/conf/distributedlog_proxy.conf b/distributedlog-core/conf/write_proxy.conf
similarity index 92%
rename from distributedlog-core/conf/distributedlog_proxy.conf
rename to distributedlog-core/conf/write_proxy.conf
index 72af987..3e58b31 100644
--- a/distributedlog-core/conf/distributedlog_proxy.conf
+++ b/distributedlog-core/conf/write_proxy.conf
@@ -51,7 +51,7 @@
 # use allocator pool for proxy
 enableLedgerAllocatorPool=true
 # ledger allocator pool path
-ledgerAllocatorPoolPath=.write_proxy_eventbus_high_throughput_allocation_pool
+ledgerAllocatorPoolPath=.write_proxy_allocation_pool
 # ledger allocator pool size
 ledgerAllocatorPoolCoreSize=40
 # check stream exists or not
@@ -110,11 +110,10 @@
 server_dlsn_version=1
 server_enable_perstream_stat=true
 server_graceful_shutdown_period_ms=35000
-stream_partition_converter_class=com.twitter.distributedlog.service.streamset.EventBusStreamPartitionConverter
 
 # write limits
 perWriterOutstandingWriteLimit=-1
-globalOutstandingWriteLimit=15000
+globalOutstandingWriteLimit=-1
 outstandingWriteLimitDarkmode=false
 
 # bytes per second limit applied at the host level (50MBps on 1Gib machines)
diff --git a/distributedlog-core/conf/zookeeper.conf.dynamic.template b/distributedlog-core/conf/zookeeper.conf.dynamic.template
new file mode 100644
index 0000000..61a1439
--- /dev/null
+++ b/distributedlog-core/conf/zookeeper.conf.dynamic.template
@@ -0,0 +1 @@
+server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
\ No newline at end of file
diff --git a/distributedlog-core/conf/zookeeper.conf.template b/distributedlog-core/conf/zookeeper.conf.template
new file mode 100644
index 0000000..f244347
--- /dev/null
+++ b/distributedlog-core/conf/zookeeper.conf.template
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The number of milliseconds of each tick
+tickTime=2000
+
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=30
+
+# the directory where the snapshot is stored.
+dataDir=/tmp/data/zookeeper
+
+# where txlog  are written
+dataLogDir=/tmp/data/zookeeper/txlog
+
+# the port at which the admin will listen
+adminPort=9990
+zookeeper.admin.enableServer=true
+
+# limit on queued clients - default: 1000
+globalOutstandingLimit=1000
+
+# number of transactions before snapshots are taken - default: 100000
+snapCount=100000
+
+# max # of clients - 0==unlimited
+maxClientCnxns=25
+
+# Election implementation to use. A value of "0" corresponds to the original
+# UDP-based version, "1" corresponds to the non-authenticated UDP-based
+# version of fast leader election, "2" corresponds to the authenticated
+# UDP-based version of fast leader election, and "3" corresponds to TCP-based
+# version of fast leader election. Currently, only 0 and 3 are supported,
+# 3 being the default
+electionAlg=3
+
+# Leader accepts client connections. Default value is "yes". The leader
+# machine coordinates updates. For higher update throughput at thes slight
+# expense of read throughput the leader can be configured to not accept
+# clients and focus on coordination.
+leaderServes=yes
+
+# Skips ACL checks. This results in a boost in throughput, but opens up full
+# access to the data tree to everyone.
+skipACL=no
+
+# Purge txn logs every hour. Before 3.4.x this was done with an external cron
+# job, now we can do it internally.
+autopurge.purgeInterval=1
+
+# Prior to version 3.4 ZooKeeper has always used NIO directly, however in
+# versions 3.4 and later Netty is supported as an option to NIO (replaces).
+# serverCnxnFactory=org.apache.zookeeper.server.NIOServerCnxnFactory
+
+standaloneEnabled=false
+# ZooKeeper Dynamic Reconfiguration
+# See: https://zookeeper.apache.org/doc/trunk/zookeeperReconfig.html
+#
+# standaloneEnabled=false
+# dynamicConfigFile=/path/to/zoo.cfg.dynamic
+#
+server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
index e859bea..298a5ff 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
@@ -57,7 +57,7 @@
 
         // stats
         if (singleton) {
-            statsLogger.registerGauge("permits", new Gauge<Number>() {
+            statsLogger.registerGauge("num_permits", new Gauge<Number>() {
                 @Override
                 public Number getDefaultValue() {
                     return 0;
diff --git a/distributedlog-service/bin/dlog b/distributedlog-service/bin/dlog
index f7b1e96..07bc047 100755
--- a/distributedlog-service/bin/dlog
+++ b/distributedlog-service/bin/dlog
@@ -50,6 +50,7 @@
 BINDIR=`dirname "$0"`
 DLOG_HOME=`cd $BINDIR/.. > /dev/null;pwd`
 
+DEFAULT_BK_CONF=$DLOG_HOME/conf/bookie.conf
 DEFAULT_LOG_CONF=$DLOG_HOME/conf/log4j.properties
 
 source $DLOG_HOME/conf/dlogenv.sh
@@ -80,6 +81,8 @@
     proxy_tool          Run distributedlog proxy tool to interact with proxies
     balancer            Run distributedlog balancer
     admin               Run distributedlog admin tool
+    zkshell             Run zookeeper shell
+    bkshell             Run bookkeeper shell
     help                This help message
 
 or command is the full name of a class with a defined main() method.
@@ -88,6 +91,7 @@
    DLOG_LOG_CONF        Log4j configuration file (default $DEFAULT_LOG_CONF)
    DLOG_EXTRA_OPTS      Extra options to be passed to the jvm
    DLOG_EXTRA_CLASSPATH Add extra paths to the dlog classpath
+   BOOKIE_CONF          Bookie Configuration file (default: $DEFAULT_BK_CONF)
 
 These variable can also be set in conf/dlogenv.sh
 EOF
@@ -132,6 +136,10 @@
     DLOG_LOG_CONF=$DEFAULT_LOG_CONF
 fi
 
+if [ -z "$BOOKIE_CONF" ]; then
+    BOOKIE_CONF=$DEFAULT_BK_CONF
+fi
+
 DLOG_CLASSPATH="$DLOG_JAR:$DLOG_CLASSPATH:$DLOG_EXTRA_CLASSPATH"
 if [ "$DLOG_LOG_CONF" != "" ]; then
     DLOG_CLASSPATH="`dirname $DLOG_LOG_CONF`:$DLOG_CLASSPATH"
@@ -166,6 +174,11 @@
     exec java $OPTS com.twitter.distributedlog.tools.Tool com.twitter.distributedlog.service.balancer.BalancerTool $@
 elif [ $COMMAND == "admin" ]; then
     exec java $OPTS com.twitter.distributedlog.tools.Tool com.twitter.distributedlog.admin.DistributedLogAdmin $@
+elif [ $COMMAND == "zkshell" ]; then
+    exec java $OPTS org.apache.zookeeper.ZooKeeperMain -server $@
+elif [ $COMMAND == "bkshell" ]; then
+    ENTRY_FORMATTER_ARG="-DentryFormatterClass=${ENTRY_FORMATTER_CLASS:-org.apache.bookkeeper.util.StringEntryFormatter}"
+    exec java $OPTS $ENTRY_FORMATTER_ARG org.apache.bookkeeper.bookie.BookieShell -conf $BOOKIE_CONF $@
 elif [ $COMMAND == "help" ]; then
     dlog_help;
 else
diff --git a/distributedlog-service/bin/dlog-daemon.sh b/distributedlog-service/bin/dlog-daemon.sh
index e95c06e..e683da0 100755
--- a/distributedlog-service/bin/dlog-daemon.sh
+++ b/distributedlog-service/bin/dlog-daemon.sh
@@ -41,6 +41,9 @@
  . $DL_HOME/conf/dlogenv.sh
 fi
 
+SERVICE_PORT=${SERVICE_PORT:-"0"}
+SERVICE_ARGS=""
+
 # DLOG logging configuration
 DLOG_LOG_DIR=${DLOG_LOG_DIR:-"$DL_HOME/logs"}
 DLOG_ROOT_LOGGER=${DLOG_ROOT_LOGGER:-'INFO,R'}
@@ -64,15 +67,28 @@
 case $service in
     (zookeeper)
         service_class="org.apache.zookeeper.server.quorum.QuorumPeerMain"
+        DLOG_ROOT_LOGGER=${ZK_ROOT_LOGGER:-'INFO,R'}
         ;;
     (bookie)
         service_class="org.apache.bookkeeper.proto.BookieServer"
+        DLOG_ROOT_LOGGER=${BK_ROOT_LOGGER:-'INFO,R'}
         ;;
     (bookie-rereplicator)
         service_class="org.apache.bookkeeper.replication.AutoRecoveryMain"
+        DLOG_ROOT_LOGGER=${BK_ROOT_LOGGER:-'INFO,R'}
         ;;
     (writeproxy)
         service_class="com.twitter.distributedlog.service.DistributedLogServerApp"
+        DLOG_ROOT_LOGGER=${WP_ROOT_LOGGER:-'INFO,R'}
+        WP_CONF_FILE=${WP_CONF_FILE:-"$DL_HOME/conf/write_proxy.conf"}
+        WP_SERVICE_PORT=${WP_SERVICE_PORT:-'4181'}
+        WP_STATS_PORT=${WP_STATS_PORT:-'9000'}
+        WP_STATS_PROVIDER=${WP_STATS_PROVIDER:-'org.apache.bookkeeper.stats.CodahaleMetricsServletProvider'}
+        WP_SHARD_ID=${WP_SHARD_ID:-'0'}
+        WP_NAMESPACE=${WP_NAMESPACE:-'distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace'}
+        SERVICE_PORT=${WP_SERVICE_PORT}
+        SERVICE_ARGS="--conf ${WP_CONF_FILE} --uri ${WP_NAMESPACE} --shard-id ${WP_SHARD_ID} --port ${WP_SERVICE_PORT} --stats-port ${WP_STATS_PORT} --stats-provider ${WP_STATS_PROVIDER} --announce --thriftmux"
+        DLOG_EXTRA_OPTS="${DLOG_EXTRA_OPTS} -DcodahaleStatsHttpPort=${WP_STATS_PORT} -Dserver_port=${WP_SERVICE_PORT} -Dserver_shard=${WP_SHARD_ID}"
         ;;
     (writeproxy-monitor)
         ;;
@@ -87,10 +103,11 @@
 
 export DLOG_LOG_DIR=$DLOG_LOG_DIR
 export DLOG_ROOT_LOGGER=$DLOG_ROOT_LOGGER
-export DLOG_LOG_FILE=dlog-$service-$HOSTNAME.log
+export DLOG_LOG_FILE=dlog-$service-$HOSTNAME-$SERVICE_PORT.log
+export DLOG_EXTRA_OPTS=$DLOG_EXTRA_OPTS
 
-pid=$DLOG_PID_DIR/dlog-$service.pid
-out=$DLOG_LOG_DIR/dlog-$service-$HOSTNAME.out
+pid=$DLOG_PID_DIR/dlog-$service-$SERVICE_PORT.pid
+out=$DLOG_LOG_DIR/dlog-$service-$HOSTNAME-$SERVICE_PORT.out
 logfile=$DLOG_LOG_DIR/$DLOG_LOG_FILE
 
 rotate_out_log ()
@@ -125,7 +142,7 @@
     rotate_out_log $out
     echo starting $service, logging to $logfile
     dlog=$DL_HOME/bin/dlog
-    nohup $dlog $service_class "$@" > "$out" 2>&1 < /dev/null &
+    nohup $dlog $service_class ${SERVICE_ARGS} "$@" > "$out" 2>&1 < /dev/null &
     echo $! > $pid
     sleep 1; head $out
     sleep 2;
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index f9e96ab..7660f5e 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -63,7 +63,21 @@
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>${zookeeper.version}</version>
-      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>jdiff</groupId>
+      <artifactId>jdiff</artifactId>
+      <version>1.0.9</version>
+    </dependency>
+    <dependency>
+      <groupId>jline</groupId>
+      <artifactId>jline</artifactId>
+      <version>2.14.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.11</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
@@ -77,7 +91,27 @@
       <version>1.9.5</version>
       <scope>test</scope>
     </dependency> 
- </dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>codahale-metrics-provider</artifactId>
+      <version>${bookkeeper.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-servlets</artifactId>
+      <version>${codahale.metrics.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+  </dependencies>
   <build>
     <plugins>
       <plugin>
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 4bd9702..6ef99b8 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -86,7 +86,7 @@
     private final Optional<Integer> port;
     private final Optional<Integer> statsPort;
     private final Optional<Integer> shardId;
-    private final Optional<String> announcePath;
+    private final Optional<Boolean> announceServerSet;
     private final Optional<Boolean> thriftmux;
 
     DistributedLogServer(Optional<String> uri,
@@ -95,7 +95,7 @@
                          Optional<Integer> port,
                          Optional<Integer> statsPort,
                          Optional<Integer> shardId,
-                         Optional<String> announcePath,
+                         Optional<Boolean> announceServerSet,
                          Optional<Boolean> thriftmux,
                          StatsReceiver statsReceiver,
                          StatsProvider statsProvider) {
@@ -105,7 +105,7 @@
         this.port = port;
         this.statsPort = statsPort;
         this.shardId = shardId;
-        this.announcePath = announcePath;
+        this.announceServerSet = announceServerSet;
         this.thriftmux = thriftmux;
         this.statsReceiver = statsReceiver;
         this.statsProvider = statsProvider;
@@ -151,7 +151,7 @@
         logger.info("Starting stats provider : {}", statsProvider.getClass());
         statsProvider.start(dlConf);
 
-        if (announcePath.isPresent()) {
+        if (announceServerSet.isPresent() && announceServerSet.get()) {
             announcer = new ServerSetAnnouncer(
                     dlUri,
                     port.or(0),
@@ -378,7 +378,7 @@
      * @param port listen port
      * @param statsPort stats port
      * @param shardId shard id
-     * @param announcePath server set announce path
+     * @param announceServerSet whether to announce itself to server set
      * @param thriftmux flag to enable thrift mux
      * @param statsReceiver receiver to receive finagle stats
      * @param statsProvider provider to receive dl stats
@@ -394,7 +394,7 @@
                Optional<Integer> port,
                Optional<Integer> statsPort,
                Optional<Integer> shardId,
-               Optional<String> announcePath,
+               Optional<Boolean> announceServerSet,
                Optional<Boolean> thriftmux,
                StatsReceiver statsReceiver,
                StatsProvider statsProvider)
@@ -407,7 +407,7 @@
                 port,
                 statsPort,
                 shardId,
-                announcePath,
+                announceServerSet,
                 thriftmux,
                 statsReceiver,
                 statsProvider);
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index 32e72d6..3a9a987 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -17,10 +17,14 @@
  */
 package com.twitter.distributedlog.service;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -30,7 +34,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.util.Arrays;
 
 import static com.twitter.distributedlog.util.CommandLineUtils.*;
@@ -52,8 +59,9 @@
         options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory");
         options.addOption("p", "port", true, "DistributedLog Server Port");
         options.addOption("sp", "stats-port", true, "DistributedLog Stats Port");
+        options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
         options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
-        options.addOption("a", "announce", true, "ServerSet Path to Announce");
+        options.addOption("a", "announce", false, "ServerSet Path to Announce");
         options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
     }
 
@@ -88,16 +96,38 @@
 
     private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException {
         final StatsReceiver statsReceiver = NullStatsReceiver.get();
-        final StatsProvider statsProvider = new NullStatsProvider();
+        Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+        if (confOptional.isPresent()) {
+            String configFile = confOptional.get();
+            try {
+                dlConf.loadConf(new File(configFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+                        + configFile + ".");
+            }
+        }
+        // load the stats provider
+        final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd")
+                .transform(new Function<String, StatsProvider>() {
+                    @Nullable
+                    @Override
+                    public StatsProvider apply(@Nullable String name) {
+                        return ReflectionUtils.newInstance(name, StatsProvider.class);
+                    }
+                }).or(new NullStatsProvider());
+        statsProvider.start(dlConf);
 
         final DistributedLogServer server = DistributedLogServer.runServer(
                 getOptionalStringArg(cmdline, "u"),
-                getOptionalStringArg(cmdline, "c"),
+                confOptional,
                 getOptionalStringArg(cmdline, "sc"),
                 getOptionalIntegerArg(cmdline, "p"),
                 getOptionalIntegerArg(cmdline, "sp"),
                 getOptionalIntegerArg(cmdline, "si"),
-                getOptionalStringArg(cmdline, "a"),
+                getOptionalBooleanArg(cmdline, "a"),
                 getOptionalBooleanArg(cmdline, "mx"),
                 statsReceiver,
                 statsProvider);
@@ -108,6 +138,7 @@
                 logger.info("Closing DistributedLog Server.");
                 server.close();
                 logger.info("Closed DistributedLog Server.");
+                statsProvider.stop();
             }
         });
 
@@ -120,6 +151,7 @@
         logger.info("DistributedLog Service Interrupted.");
         server.close();
         logger.info("Closed DistributedLog Server.");
+        statsProvider.stop();
     }
 
     public static void main(String[] args) {
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
new file mode 100644
index 0000000..c08f0f0
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stats;
+
+import com.codahale.metrics.health.HealthCheckRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extend the codahale metrics provider to run servlets
+ */
+public class CodahaleMetricsServletProvider extends CodahaleMetricsProvider {
+
+    private final static Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class);
+
+    ServletReporter servletReporter = null;
+    private final HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry();
+
+    @Override
+    public void start(Configuration conf) {
+        super.start(conf);
+        Integer httpPort = conf.getInteger("codahaleStatsHttpPort", null);
+        if (null != httpPort) {
+            servletReporter = new ServletReporter(
+                    getMetrics(),
+                    healthCheckRegistry,
+                    httpPort);
+            try {
+                servletReporter.start();
+            } catch (Exception e) {
+                logger.warn("Encountered error on starting the codahale metrics servlet", e);
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (null != servletReporter) {
+            try {
+                servletReporter.stop();
+            } catch (Exception e) {
+                logger.error("Encountered error on stopping the codahale metrics servlet", e);
+            }
+        }
+        super.stop();
+    }
+}
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
new file mode 100644
index 0000000..83b4995
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
@@ -0,0 +1,21 @@
+package org.apache.bookkeeper.stats;
+
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.codahale.metrics.servlets.HealthCheckServlet;
+
+/**
+ * Health Check Servlet Listener
+ */
+public class HealthCheckServletContextListener extends HealthCheckServlet.ContextListener {
+
+    private final HealthCheckRegistry healthCheckRegistry;
+
+    public HealthCheckServletContextListener(HealthCheckRegistry healthCheckRegistry) {
+        this.healthCheckRegistry = healthCheckRegistry;
+    }
+
+    @Override
+    protected HealthCheckRegistry getHealthCheckRegistry() {
+        return healthCheckRegistry;
+    }
+}
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
new file mode 100644
index 0000000..bbed58c
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
@@ -0,0 +1,18 @@
+package org.apache.bookkeeper.stats;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.servlets.MetricsServlet;
+
+public class MetricsServletContextListener extends MetricsServlet.ContextListener {
+
+    private final MetricRegistry metricRegistry;
+
+    public MetricsServletContextListener(MetricRegistry metricRegistry) {
+        this.metricRegistry = metricRegistry;
+    }
+
+    @Override
+    protected MetricRegistry getMetricRegistry() {
+        return metricRegistry;
+    }
+}
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
new file mode 100644
index 0000000..9cf0610
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stats;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.codahale.metrics.servlets.AdminServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+/**
+ * Starts a jetty server on a configurable port to export stats
+ */
+public class ServletReporter {
+
+    private final MetricRegistry metricRegistry;
+    private final HealthCheckRegistry healthCheckRegistry;
+    private final int port;
+    private final Server jettyServer;
+
+    public ServletReporter(MetricRegistry metricRegistry,
+                           HealthCheckRegistry healthCheckRegistry,
+                           int port) {
+        this.metricRegistry = metricRegistry;
+        this.healthCheckRegistry = healthCheckRegistry;
+        this.port = port;
+        this.jettyServer = new Server(port);
+    }
+
+    public void start() throws Exception {
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+        jettyServer.setHandler(context);
+
+        context.addEventListener(new HealthCheckServletContextListener(healthCheckRegistry));
+        context.addEventListener(new MetricsServletContextListener(metricRegistry));
+        context.addServlet(new ServletHolder(new AdminServlet()), "/*");
+
+        jettyServer.start();
+    }
+
+    public void stop() throws Exception {
+        jettyServer.stop();
+    }
+
+}
diff --git a/docs/operations/bookkeeper.rst b/docs/operations/bookkeeper.rst
new file mode 100644
index 0000000..5a35ba9
--- /dev/null
+++ b/docs/operations/bookkeeper.rst
@@ -0,0 +1,193 @@
+BookKeeper
+==========
+
+For reliable BookKeeper service, you should deploy BookKeeper in a cluster.
+
+Run from bookkeeper source
+--------------------------
+
+The version of BookKeeper that DistributedLog depends on is not the official opensource version.
+It is twitter's production version `4.3.4-TWTTR`, which is available in `https://github.com/twitter/bookkeeper`. 
+We are working actively with BookKeeper community to merge all twitter's changes back to the community.
+
+The major changes in Twitter's bookkeeper includes:
+
+- BOOKKEEPER-670_: Long poll reads and LastAddConfirmed piggyback. It is to reduce the tailing read latency.
+- BOOKKEEPER-759_: Delay ensemble change if it doesn't break ack quorum constraint. It is to reduce the write latency on bookie failures.
+- BOOKKEEPER-757_: Ledger recovery improvements, to reduce the latency on ledger recovery.
+- Misc improvements on bookie recovery and bookie storage.
+
+.. _BOOKKEEPER-670: https://issues.apache.org/jira/browse/BOOKKEEPER-670
+.. _BOOKKEEPER-759: https://issues.apache.org/jira/browse/BOOKKEEPER-759
+.. _BOOKKEEPER-757: https://issues.apache.org/jira/browse/BOOKKEEPER-757
+
+To build bookkeeper, run:
+
+1. First checkout the bookkeeper source code from twitter's branch.
+
+.. code-block:: bash
+
+    $ git clone https://github.com/twitter/bookkeeper.git bookkeeper   
+
+
+2. Build the bookkeeper package:
+
+.. code-block:: bash
+
+    $ cd bookkeeper 
+    $ mvn clean package assembly:single -DskipTests
+
+However, since `bookkeeper-server` is one of the dependency of `distributedlog-service`.
+You could simply run bookkeeper using same set of scripts provided in `distributedlog-service`.
+In the following sections, we will describe how to run bookkeeper using the scripts provided in
+`distributedlog-service`.
+
+Run from distributedlog source
+------------------------------
+
+Build
++++++
+
+First of all, build DistributedLog:
+
+.. code-block:: bash
+
+    $ mvn clean install -DskipTests
+
+
+Configuration
++++++++++++++
+
+The configuration file `bookie.conf` under `distributedlog-service/conf` is a template of production
+configuration to run a bookie node. Most of the configuration settings are good for production usage.
+You might need to configure following settings according to your environment and hardware platform.
+
+Port
+^^^^
+
+By default, the service port is `3181`, where the bookie server listens on. You can change the port
+to whatever port you like by modifying the following setting.
+
+::
+
+    bookiePort=3181
+
+
+Disks
+^^^^^
+
+You need to configure following settings according to the disk layout of your hardware. It is recommended
+to put `journalDirectory` under a separated disk from others for performance. It is okay to set
+`indexDirectories` to be same as `ledgerDirectories`. However, it is recommended to put `indexDirectories`
+to a SSD driver for better performance.
+
+::
+    
+    # Directory Bookkeeper outputs its write ahead log
+    journalDirectory=/tmp/data/bk/journal
+
+    # Directory Bookkeeper outputs ledger snapshots
+    ledgerDirectories=/tmp/data/bk/ledgers
+
+    # Directory in which index files will be stored.
+    indexDirectories=/tmp/data/bk/ledgers
+
+
+To better understand how bookie nodes work, please check bookkeeper_ website for more details.
+
+ZooKeeper
+^^^^^^^^^
+
+You need to configure following settings to point the bookie to the zookeeper server that it is using.
+You need to make sure `zkLedgersRootPath` exists before starting the bookies.
+
+::
+   
+    # Root zookeeper path to store ledger metadata
+    # This parameter is used by zookeeper-based ledger manager as a root znode to
+    # store all ledgers.
+    zkLedgersRootPath=/messaging/bookkeeper/ledgers
+    # A list of one of more servers on which zookeeper is running.
+    zkServers=localhost:2181
+
+
+Stats Provider
+^^^^^^^^^^^^^^
+
+Bookies use `StatsProvider` to expose its metrics. The `StatsProvider` is a pluggable library to
+adopt to various stats collecting systems. Please check :doc:`monitoring` for more details.
+
+::
+    
+    # stats provide - use `codahale` metrics library
+    statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsServletProvider
+
+    ### Following settings are stats provider related settings
+
+    # Exporting codahale stats in http port `9001`
+    codahaleStatsHttpPort=9001
+
+
+Index Settings
+^^^^^^^^^^^^^^
+
+- `pageSize`: size of a index page in ledger cache, in bytes. If there are large number
+  of ledgers and each ledger has fewer entries, smaller index page would improve memory usage.
+- `pageLimit`: The maximum number of index pages in ledger cache. If nummber of index pages
+  reaches the limitation, bookie server starts to swap some ledgers from memory to disk.
+  Increase this value when swap becomes more frequent. But make sure `pageLimit*pageSize`
+  should not be more than JVM max memory limitation.
+
+
+Journal Settings
+^^^^^^^^^^^^^^^^
+
+- `journalMaxGroupWaitMSec`: The maximum wait time for group commit. It is valid only when
+  `journalFlushWhenQueueEmpty` is false.
+- `journalFlushWhenQueueEmpty`: Flag indicates whether to flush/sync journal. If it is `true`,
+  bookie server will sync journal when there is no other writes in the journal queue.
+- `journalBufferedWritesThreshold`: The maximum buffered writes for group commit, in bytes.
+  It is valid only when `journalFlushWhenQueueEmpty` is false.
+- `journalBufferedEntriesThreshold`: The maximum buffered writes for group commit, in entries.
+  It is valid only when `journalFlushWhenQueueEmpty` is false.
+
+Setting `journalFlushWhenQueueEmpty` to `true` will produce low latency when the traffic is low.
+However, the latency varies a lost when the traffic is increased. So it is recommended to set
+`journalMaxGroupWaitMSec`, `journalBufferedEntriesThreshold` and `journalBufferedWritesThreshold`
+to reduce the number of fsyncs made to journal disk, to achieve sustained low latency.
+
+Thread Settings
+^^^^^^^^^^^^^^^
+
+It is recommended to configure following settings to align with the cpu cores of the hardware.
+
+::
+    
+    numAddWorkerThreads=4
+    numJournalCallbackThreads=4
+    numReadWorkerThreads=4
+    numLongPollWorkerThreads=4
+
+Run 
++++
+
+As `bookkeeper-server` is shipped as part of `distributedlog-service`, you could use the `dlog-daemon.sh`
+script to start `bookie` as daemon thread.
+
+Start the bookie:
+
+.. code-block:: bash
+
+    $ ./distributedlog-service/bin/dlog-daemon.sh start bookie --conf /path/to/bookie/conf
+
+
+Stop the bookie:
+
+.. code-block:: bash
+
+    $ ./distributedlog-service/bin/dlog-daemon.sh stop bookie
+
+
+Please check bookkeeper_ website for more details.
+
+.. _bookkeeper: http://bookkeeper.apache.org/
diff --git a/docs/operations/deployment.rst b/docs/operations/deployment.rst
index 5e9d564..461ac95 100644
--- a/docs/operations/deployment.rst
+++ b/docs/operations/deployment.rst
@@ -1,6 +1,7 @@
 Cluster Setup & Deployment
 ==========================
 
+This section describes how to run DistributedLog in `distributed` mode.
 To run a cluster with DistributedLog, you need a Zookeeper cluster and a Bookkeeper cluster.
 
 Build
@@ -10,16 +11,524 @@
 
 .. code-block:: bash
 
-   mvn package
+   mvn clean install -DskipTests
 
-This will generate a zip file in ``distributedlog-service/target``. This zip file contains the main JAR to run DistributedLog, and all the dependencies.
+
+Or run `./scripts/snapshot` to build the release packages from current source. The released
+packages contain the binaries for running `distributedlog-service`, `distributedlog-benchmark`
+and `distributedlog-tutorials`.
+
+NOTE: we run following instructions from distributedlog source code after running `mvn clean install`.
+And assume `DLOG_HOME` is the directory of distributedlog source.
 
 Zookeeper
 ---------
 
-Please refer to the :doc:`/zookeeper` configuration.
+(If you already have a zookeeper cluster running, you could skip this section.)
+
+We could use the `dlog-daemon.sh` and the `zookeeper.conf.template` to demonstrate run a 1-node
+zookeeper ensemble locally.
+
+Create a `zookeeper.conf` from the `zookeeper.conf.template`.
+
+.. code-block:: bash
+
+    $ cp distributedlog-service/conf/zookeeper.conf.template distributedlog-service/conf/zookeeper.conf
+
+Configure the settings in `zookeeper.conf`. By default, it will use `/tmp/data/zookeeper` for storing
+the zookeeper data. Let's create the data directories for zookeeper.
+
+.. code-block:: bash
+
+    $ mkdir -p /tmp/data/zookeeper/txlog
+
+Once the data directory is created, we need to assign `myid` for this zookeeper node.
+
+.. code-block:: bash
+
+    $ echo "1" > /tmp/data/zookeeper/myid
+
+Start the zookeeper daemon using `dlog-daemon.sh`.
+
+.. code-block:: bash
+
+    $ ./distributedlog-service/bin/dlog-daemon.sh start zookeeper ${DL_HOME}/distributedlog-service/conf/zookeeper.conf
+
+You could verify the zookeeper setup using `zkshell`.
+
+.. code-block:: bash
+
+    // ./distributedlog-service/bin/dlog zkshell ${zkservers}
+    $ ./distributedlog-service/bin/dlog zkshell localhost:2181
+    Connecting to localhost:2181
+    Welcome to ZooKeeper!
+    JLine support is enabled
+
+    WATCHER::
+
+    WatchedEvent state:SyncConnected type:None path:null
+    [zk: localhost:2181(CONNECTED) 0] ls /
+    [zookeeper]
+    [zk: localhost:2181(CONNECTED) 1]
+
+Please refer to the :doc:`zookeeper` for more details on setting up zookeeper cluster.
 
 Bookkeeper
 ----------
 
-There is no minimal instances needed to run a Bookkeeper cluster.
+(If you already have a bookkeeper cluster running, you could skip this section.)
+
+We could use the `dlog-daemon.sh` and the `bookie.conf.template` to demonstrate run a 3-nodes
+bookkeeper cluster locally.
+
+Create a `bookie.conf` from the `bookie.conf.template`. Since we are going to run a 3-nodes
+bookkeeper cluster locally. Let's make three copies of `bookie.conf.template`.
+
+.. code-block:: bash
+
+    $ cp distributedlog-service/conf/bookie.conf.template distributedlog-service/conf/bookie-1.conf
+    $ cp distributedlog-service/conf/bookie.conf.template distributedlog-service/conf/bookie-2.conf
+    $ cp distributedlog-service/conf/bookie.conf.template distributedlog-service/conf/bookie-3.conf
+
+Configure the settings in the bookie configuraiont files.
+
+First of all, choose the zookeeper cluster that the bookies will use and set `zkServers` in
+the configuration files.
+
+::
+    
+    zkServers=localhost:2181
+
+Choose the zookeeper path to store bookkeeper metadata and set `zkLedgersRootPath` in the configuration
+files. Let's use `/messaging/bookkeeper/ledgers` in this instruction.
+
+::
+
+    zkLedgersRootPath=/messaging/bookkeeper/ledgers
+
+
+Format bookkeeper metadata
+++++++++++++++++++++++++++
+
+(NOTE: only format bookkeeper metadata when first time setting up the bookkeeper cluster.)
+
+The bookkeeper shell doesn't automatically create the `zkLedgersRootPath` when running `metaformat`.
+So using `zkshell` to create the `zkLedgersRootPath`.
+
+::
+
+    $ ./distributedlog-service/bin/dlog zkshell localhost:2181
+    Connecting to localhost:2181
+    Welcome to ZooKeeper!
+    JLine support is enabled
+
+    WATCHER::
+
+    WatchedEvent state:SyncConnected type:None path:null
+    [zk: localhost:2181(CONNECTED) 0] create /messaging ''
+    Created /messaging
+    [zk: localhost:2181(CONNECTED) 1] create /messaging/bookkeeper ''
+    Created /messaging/bookkeeper
+    [zk: localhost:2181(CONNECTED) 2] create /messaging/bookkeeper/ledgers ''
+    Created /messaging/bookkeeper/ledgers
+    [zk: localhost:2181(CONNECTED) 3]
+
+
+If the `zkLedgersRootPath`, run `metaformat` to format the bookkeeper metadata.
+
+::
+    
+    $ BOOKIE_CONF=${DL_HOME}/distributedlog-service/conf/bookie-1.conf ./distributedlog-service/bin/dlog bkshell metaformat
+    Are you sure to format bookkeeper metadata ? (Y or N) Y
+
+Add Bookies
++++++++++++
+
+Once the bookkeeper metadata is formatted, it is ready to add bookie nodes to the cluster.
+
+Configure Ports
+^^^^^^^^^^^^^^^
+
+Configure the ports that used by bookies.
+
+bookie-1:
+
+::
+   
+    # Port that bookie server listen on
+    bookiePort=3181
+    # Exporting codahale stats
+    185 codahaleStatsHttpPort=9001
+
+bookie-2:
+
+::
+   
+    # Port that bookie server listen on
+    bookiePort=3182
+    # Exporting codahale stats
+    185 codahaleStatsHttpPort=9002
+
+bookie-3:
+
+::
+   
+    # Port that bookie server listen on
+    bookiePort=3183
+    # Exporting codahale stats
+    185 codahaleStatsHttpPort=9003
+
+Configure Disk Layout
+^^^^^^^^^^^^^^^^^^^^^
+
+Configure the disk directories used by a bookie server by setting following options.
+
+::
+    
+    # Directory Bookkeeper outputs its write ahead log
+    journalDirectory=/tmp/data/bk/journal
+    # Directory Bookkeeper outputs ledger snapshots
+    ledgerDirectories=/tmp/data/bk/ledgers
+    # Directory in which index files will be stored.
+    indexDirectories=/tmp/data/bk/ledgers
+
+As we are configuring a 3-nodes bookkeeper cluster, we modify the following settings as below:
+
+bookie-1:
+
+::
+    
+    # Directory Bookkeeper outputs its write ahead log
+    journalDirectory=/tmp/data/bk-1/journal
+    # Directory Bookkeeper outputs ledger snapshots
+    ledgerDirectories=/tmp/data/bk-1/ledgers
+    # Directory in which index files will be stored.
+    indexDirectories=/tmp/data/bk-1/ledgers
+
+bookie-2:
+
+::
+    
+    # Directory Bookkeeper outputs its write ahead log
+    journalDirectory=/tmp/data/bk-2/journal
+    # Directory Bookkeeper outputs ledger snapshots
+    ledgerDirectories=/tmp/data/bk-2/ledgers
+    # Directory in which index files will be stored.
+    indexDirectories=/tmp/data/bk-2/ledgers
+
+bookie-3:
+
+::
+    
+    # Directory Bookkeeper outputs its write ahead log
+    journalDirectory=/tmp/data/bk-3/journal
+    # Directory Bookkeeper outputs ledger snapshots
+    ledgerDirectories=/tmp/data/bk-3/ledgers
+    # Directory in which index files will be stored.
+    indexDirectories=/tmp/data/bk-3/ledgers
+
+Format bookie
+^^^^^^^^^^^^^
+
+Once the disk directories are configured correctly in the configuration file, use
+`bkshell bookieformat` to format the bookie.
+
+::
+    
+    BOOKIE_CONF=${DL_HOME}/distributedlog-service/conf/bookie-1.conf ./distributedlog-service/bin/dlog bkshell bookieformat
+    BOOKIE_CONF=${DL_HOME}/distributedlog-service/conf/bookie-2.conf ./distributedlog-service/bin/dlog bkshell bookieformat
+    BOOKIE_CONF=${DL_HOME}/distributedlog-service/conf/bookie-3.conf ./distributedlog-service/bin/dlog bkshell bookieformat
+
+
+Start bookie
+^^^^^^^^^^^^
+
+Start the bookie using `dlog-daemon.sh`.
+
+::
+    
+    SERVICE_PORT=3181 ./distributedlog-service/bin/dlog-daemon.sh start bookie --conf ${DL_HOME}/distributedlog-service/conf/bookie-1.conf
+    SERVICE_PORT=3182 ./distributedlog-service/bin/dlog-daemon.sh start bookie --conf ${DL_HOME}/distributedlog-service/conf/bookie-2.conf
+    SERVICE_PORT=3183 ./distributedlog-service/bin/dlog-daemon.sh start bookie --conf ${DL_HOME}/distributedlog-service/conf/bookie-3.conf
+    
+Verify whether the bookie is setup correctly. You could simply check whether the bookie is showed up in
+zookeeper `zkLedgersRootPath`/available znode.
+
+::
+    
+    $ ./distributedlog-service/bin/dlog zkshell localhost:2181
+    Connecting to localhost:2181
+    Welcome to ZooKeeper!
+    JLine support is enabled
+
+    WATCHER::
+
+    WatchedEvent state:SyncConnected type:None path:null
+    [zk: localhost:2181(CONNECTED) 0] ls /messaging/bookkeeper/ledgers/available
+    [127.0.0.1:3181, 127.0.0.1:3182, 127.0.0.1:3183, readonly]
+    [zk: localhost:2181(CONNECTED) 1]
+
+
+Or check if the bookie is exposing the stats at port `codahaleStatsHttpPort`.
+
+::
+    
+    // ping the service
+    $ curl localhost:9001/ping
+    pong
+    // checking the stats
+    curl localhost:9001/metrics?pretty=true
+
+Stop bookie
+^^^^^^^^^^^
+
+Stop the bookie using `dlog-daemon.sh`.
+
+::
+    
+    $ ./distributedlog-service/bin/dlog-daemon.sh stop bookie
+    // Example:
+    $ SERVICE_PORT=3181 ./distributedlog-service/bin/dlog-daemon.sh stop bookie
+    doing stop bookie ...
+    stopping bookie
+    Shutdown is in progress... Please wait...
+    Shutdown completed.
+
+Turn bookie to readonly
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Start the bookie in `readonly` mode.
+
+::
+    
+    $ SERVICE_PORT=3181 ./distributedlog-service/bin/dlog-daemon.sh start bookie --conf ${DL_HOME}/distributedlog-service/conf/bookie-1.conf --readonly
+
+Verify if the bookie is running in `readonly` mode.
+
+::
+    
+    $ ./distributedlog-service/bin/dlog zkshell localhost:2181
+    Connecting to localhost:2181
+    Welcome to ZooKeeper!
+    JLine support is enabled
+
+    WATCHER::
+
+    WatchedEvent state:SyncConnected type:None path:null
+    [zk: localhost:2181(CONNECTED) 0] ls /messaging/bookkeeper/ledgers/available
+    [127.0.0.1:3182, 127.0.0.1:3183, readonly]
+    [zk: localhost:2181(CONNECTED) 1] ls /messaging/bookkeeper/ledgers/available/readonly
+    [127.0.0.1:3181]
+    [zk: localhost:2181(CONNECTED) 2]
+
+Please refer to the :doc:`bookkeeper` for more details on setting up bookkeeper cluster.
+
+Create Namespace
+----------------
+
+After setting up a zookeeper cluster and a bookkeeper cluster, you could provision DL namespaces
+for applications to use.
+
+Provisioning a DistributedLog namespace is accomplished via the `bind` command available in `dlog tool`.
+
+Namespace is bound by writing bookkeeper environment settings (e.g. the ledger path, bkLedgersZkPath,
+or the set of Zookeeper servers used by bookkeeper, bkZkServers) as metadata in the zookeeper path of
+the namespace DL URI. The DL library resolves the DL URI to determine which bookkeeper cluster it
+should read and write to. 
+
+The namespace binding has following features:
+
+- `Inheritance`: suppose `distributedlog://<zkservers>/messaging/distributedlog` is bound to bookkeeper
+  cluster `X`. All the streams created under `distributedlog://<zkservers>/messaging/distributedlog`,
+  will write to bookkeeper cluster `X`.
+- `Override`: suppose `distributedlog://<zkservers>/messaging/distributedlog` is bound to bookkeeper
+  cluster `X`. You want streams under `distributedlog://<zkservers>/messaging/distributedlog/S` write
+  to bookkeeper cluster `Y`. You could just bind `distributedlog://<zkservers>/messaging/distributedlog/S`
+  to bookkeeper cluster `Y`. The binding to `distributedlog://<zkservers>/messaging/distributedlog/S`
+  only affects streams under `distributedlog://<zkservers>/messaging/distributedlog/S`.
+
+Create namespace binding using `dlog tool`. For example, we create a namespace
+`distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace` pointing to the
+bookkeeper cluster we just created above.
+
+::
+    
+    $ distributedlog-service/bin/dlog admin bind \\
+        -dlzr 127.0.0.1:2181 \\
+        -dlzw 127.0.0.1:2181 \\
+        -s 127.0.0.1:2181 \\
+        -bkzr 127.0.0.1:2181 \\
+        -l /messaging/bookkeeper/ledgers \\
+        -i false \\
+        -r true \\
+        -c \\
+        distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace
+
+    No bookkeeper is bound to distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace
+    Created binding on distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace.
+
+
+- Configure the zookeeper cluster used for storing DistributedLog metadata: `-dlzr` and `-dlzw`.
+  Ideally `-dlzr` and `-dlzw` would be same the zookeeper server in distributedlog namespace uri.
+  However to scale zookeeper reads, the zookeeper observers sometimes are added in a different
+  domain name than participants. In such case, configuring `-dlzr` and `-dlzw` to different
+  zookeeper domain names would help isolating zookeeper write and read traffic.
+- Configure the zookeeper cluster used by bookkeeper for storing the metadata : `-bkzr` and `-s`.
+  Similar as `-dlzr` and `-dlzw`, you could configure the namespace to use different zookeeper
+  domain names for readers and writers to access bookkeeper metadatadata.
+- Configure the bookkeeper ledgers path: `-l`.
+- Configure the zookeeper path to store DistributedLog metadata. It is implicitly included as part
+  of namespace URI.
+
+Write Proxy
+-----------
+
+A write proxy consists of multiple write proxies. They don't store any state locally. So they are
+mostly stateless and can be run as many as you can.
+
+Configuration
++++++++++++++
+
+Different from bookkeeper, DistributedLog tries not to configure any environment related settings
+in configuration files. Any environment related settings are stored and configured via `namespace binding`.
+The configuration file should contain non-environment related settings.
+
+There is a `write_proxy.conf` template file available under `distributedlog-service` module.
+
+Run write proxy
++++++++++++++++
+
+A write proxy could be started using `dlog-daemon.sh` script under `distributedlog-service`.
+
+::
+    
+    WP_SHARD_ID=${WP_SHARD_ID} WP_SERVICE_PORT=${WP_SERVICE_PORT} WP_STATS_PORT=${WP_STATS_PORT} ./distributedlog-service/bin/dlog-daemon.sh start writeproxy
+
+- `WP_SHARD_ID`: A non-negative integer. You don't need to guarantee uniqueness of shard id, as it is just an
+  indicator to the client for routing the requests. If you are running the `write proxy` using a cluster scheduler
+  like `aurora`, you could easily obtain a shard id and use that to configure `WP_SHARD_ID`.
+- `WP_SERVICE_PORT`: The port that write proxy listens on.
+- `WP_STATS_PORT`: The port that write proxy exposes stats to a http endpoint.
+
+Please check `distributedlog-service/conf/dlogenv.sh` for more environment variables on configuring write proxy.
+
+- `WP_CONF_FILE`: The path to the write proxy configuration file.
+- `WP_NAMESPACE`: The distributedlog namespace that the write proxy is serving for.
+
+For example, we start 3 write proxies locally and point to the namespace created above.
+
+::
+    
+    $ WP_SHARD_ID=1 WP_SERVICE_PORT=4181 WP_STATS_PORT=20001 ./distributedlog-service/bin/dlog-daemon.sh start writeproxy
+    $ WP_SHARD_ID=2 WP_SERVICE_PORT=4182 WP_STATS_PORT=20002 ./distributedlog-service/bin/dlog-daemon.sh start writeproxy
+    $ WP_SHARD_ID=3 WP_SERVICE_PORT=4183 WP_STATS_PORT=20003 ./distributedlog-service/bin/dlog-daemon.sh start writeproxy
+
+The write proxy will announce itself to the zookeeper path `.write_proxy` under the dl namespace path.
+
+We could verify that the write proxy is running correctly by checking the zookeeper path or checking its stats port.
+
+::
+    $ ./distributedlog-service/bin/dlog zkshell localhost:2181
+    Connecting to localhost:2181
+    Welcome to ZooKeeper!
+    JLine support is enabled
+
+    WATCHER::
+
+    WatchedEvent state:SyncConnected type:None path:null
+    [zk: localhost:2181(CONNECTED) 0] ls /messaging/distributedlog/mynamespace/.write_proxy
+    [member_0000000000, member_0000000001, member_0000000002]
+
+
+::
+    
+    $ curl localhost:20001/ping
+    pong
+
+
+Add and Remove Write Proxies
+++++++++++++++++++++++++++++
+
+Removing a write proxy is pretty straightforward by just killing the process.
+
+::
+    
+    WP_SHARD_ID=1 WP_SERVICE_PORT=4181 WP_STATS_PORT=10001 ./distributedlog-service/bin/dlog-daemon.sh stop writeproxy
+
+
+Adding a new write proxy is just adding a new host and starting the write proxy
+process as described above.
+
+Write Proxy Naming
+++++++++++++++++++
+
+The `dlog-daemon.sh` script starts the write proxy by announcing it to the `.write_proxy` path under
+the dl namespace. So you could use `zk!<zkservers>!/<namespace_path>/.write_proxy` as the finagle name
+to access the write proxy cluster. It is `zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy`
+in the above example.
+
+Verify the setup
+++++++++++++++++
+
+You could verify the write proxy cluster by running tutorials over the setup cluster.
+
+Create 10 streams.
+
+::
+    
+    $ ./distributedlog-service/bin/dlog tool create -u distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace -r stream- -e 0-10
+    You are going to create streams : [stream-0, stream-1, stream-2, stream-3, stream-4, stream-5, stream-6, stream-7, stream-8, stream-9, stream-10] (Y or N) Y
+
+
+Tail read from the 10 streams.
+
+::
+    
+    $ ./distributedlog-tutorials/distributedlog-basic/bin/runner run c.twitter.distributedlog.basic.MultiReader distributedlog://127.0.0.1:2181/messaging/distributedlog/mynamespace stream-0,stream-1,stream-2,stream-3,stream-4,stream-5,stream-6,stream-7,stream-8,stream-9,stream-10
+
+
+Run record generator over some streams
+
+::
+    
+    $ ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.RecordGenerator 'zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy' stream-0 100
+    $ ./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.RecordGenerator 'zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy' stream-1 100
+
+
+Check the terminal running `MultiReader`. You will see similar output as below:
+
+::
+    
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=21044, slotId=0} from stream stream-0
+    """
+    record-1464085079105
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=21046, slotId=0} from stream stream-0
+    """
+    record-1464085079113
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=9636, slotId=0} from stream stream-1
+    """
+    record-1464085079110
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=21048, slotId=0} from stream stream-0
+    """
+    record-1464085079125
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=9638, slotId=0} from stream stream-1
+    """
+    record-1464085079121
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=21050, slotId=0} from stream stream-0
+    """
+    record-1464085079133
+    """
+    Received record DLSN{logSegmentSequenceNo=1, entryId=9640, slotId=0} from stream stream-1
+    """
+    record-1464085079130
+    """
+
+
+
+Please refer to the :doc:`performance` for more details on tuning performance.
diff --git a/docs/operations/main.rst b/docs/operations/main.rst
index d384cac..6eb2a96 100644
--- a/docs/operations/main.rst
+++ b/docs/operations/main.rst
@@ -10,3 +10,4 @@
    hardware
    monitoring
    zookeeper
+   bookkeeper
diff --git a/docs/operations/zookeeper.rst b/docs/operations/zookeeper.rst
index fdc5de2..a0d65a5 100644
--- a/docs/operations/zookeeper.rst
+++ b/docs/operations/zookeeper.rst
@@ -5,3 +5,84 @@
 nodes. There is no constraints on the number of Zookeeper nodes you
 need. One node is enough to run your cluster, but for reliability
 purpose, you should run at least 3 nodes.
+
+Version
+-------
+
+DistributedLog leverages zookeepr `multi` operations for metadata updates.
+So the minimum version of zookeeper is 3.4.*. We recommend to run stable
+zookeeper version `3.4.8`.
+
+Run ZooKeeper from distributedlog source
+----------------------------------------
+
+Since `zookeeper` is one of the dependency of `distributedlog-service`. You could simply
+run `zookeeper` servers using same set of scripts provided in `distributedlog-service`.
+In the following sections, we will describe how to run zookeeper using the scripts provided
+in `distributedlog-service`.
+
+Build
++++++
+
+First of all, build DistributedLog:
+
+.. code-block:: bash
+
+    $ mvn clean install -DskipTests
+
+Configuration
++++++++++++++
+
+The configuration file `zookeeper.conf.template` under `distributedlog-service/conf` is a template of
+production configuration to run a zookeeper node. Most of the configuration settings are good for
+production usage. You might need to configure following settings according to your environment and
+hardware platform.
+
+Ensemble
+^^^^^^^^
+
+You need to configure the zookeeper servers form this ensemble as below:
+
+::
+    
+    server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
+
+
+Please check zookeeper_ website for more configurations.
+
+Disks
+^^^^^
+
+You need to configure following settings according to the disk layout of your hardware.
+It is recommended to put `dataLogDir` under a separated disk from others for performance.
+
+::
+    
+    # the directory where the snapshot is stored.
+    dataDir=/tmp/data/zookeeper
+    
+    # where txlog  are written
+    dataLogDir=/tmp/data/zookeeper/txlog
+
+
+Run
++++
+
+As `zookeeper` is shipped as part of `distributedlog-service`, you could use the `dlog-daemon.sh`
+script to start `zookeeper` as daemon thread.
+
+Start the zookeeper:
+
+.. code-block:: bash
+
+    $ ./distributedlog-service/bin/dlog-daemon.sh start zookeeper /path/to/zookeeper.conf
+
+Stop the zookeeper:
+
+.. code-block:: bash
+
+    $ ./distributedlog-service/bin/dlog-daemon.sh stop zookeeper
+
+Please check zookeeper_ website for more details.
+
+.. _zookeeper: http://zookeeper.apache.org/
diff --git a/pom.xml b/pom.xml
index e6e692b..74e49b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,8 @@
     <birdcage.sha>6.34.0</birdcage.sha>
     <scrooge.version>4.6.0</scrooge.version>
     <scrooge-maven-plugin.version>3.17.0</scrooge-maven-plugin.version>
+    <codahale.metrics.version>3.0.1</codahale.metrics.version>
+    <jetty.version>8.1.19.v20160209</jetty.version>
   </properties>
   <build>
     <plugins>