ISSUE #197: Provide a guide for running on k8s

Descriptions of the changes in this PR:

add one page for how to deploy on k8s. it is based on http://bookkeeper.apache.org/docs/latest/deployment/kubernetes/ and use distributedlog image and add instructions on how to create distributedlog namespaces and run benchmark.

This change is based on #196

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #198 from sijie/add_docker, closes #197
diff --git a/deploy/kubernetes/gke/benchmark-reader.yaml b/deploy/kubernetes/gke/benchmark-reader.yaml
new file mode 100644
index 0000000..c1c1c32
--- /dev/null
+++ b/deploy/kubernetes/gke/benchmark-reader.yaml
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+    name: benchmark-reader-config
+data:
+    # benchmark
+    BK_DLOG_ROOT_LOGGER: "INFO,stderr"
+    BK_DL_NAMESPACE: "distributedlog://zookeeper/distributedlog"
+    BK_STATS_PROVIDER: org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
+    BK_INITIAL_RATE: "100"
+---
+
+## benchmark reader
+
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+    name: benchmark-reader
+spec:
+    replicas: 1
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: benchmark-reader
+                cluster: bookkeeper-gke
+            annotations:
+                prometheus.io/scrape: "true"
+                prometheus.io/port: "8000"
+        spec:
+            containers:
+              - name: benchmark-reader
+                image: apachedistributedlog/distributedlog:latest
+                resources:
+                  requests:
+                    memory: "3Gi"
+                    cpu: "1000m"
+                  limits:
+                    memory: "5Gi"
+                    cpu: "2000m"
+                command: [ "/bin/bash", "/opt/distributedlog/bin/entrypoint.sh" ]
+                args: [ "/opt/distributedlog/bin/dbench", "read" ]
+                envFrom:
+                  - configMapRef:
+                        name: benchmark-reader-config
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+    name: benchmark-reader
+    labels:
+        app: bookkeeper
+        component: benchmark-reader
+spec:
+    clusterIP: None
+    selector:
+        app: bookkeeper
+        component: benchmark-reader
diff --git a/deploy/kubernetes/gke/benchmark-writer.yaml b/deploy/kubernetes/gke/benchmark-writer.yaml
new file mode 100644
index 0000000..1941989
--- /dev/null
+++ b/deploy/kubernetes/gke/benchmark-writer.yaml
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+    name: benchmark-config
+data:
+    # benchmark
+    BK_DLOG_ROOT_LOGGER: "INFO,stderr"
+    BK_DL_NAMESPACE: "distributedlog://zookeeper/distributedlog"
+    BK_STATS_PROVIDER: org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
+    BK_INITIAL_RATE: "100"
+---
+
+## benchmark writer
+
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+    name: benchmark-writer
+spec:
+    replicas: 1
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: benchmark-writer
+                cluster: bookkeeper-gke
+            annotations:
+                prometheus.io/scrape: "true"
+                prometheus.io/port: "8000"
+        spec:
+            containers:
+              - name: benchmark-writer
+                image: apachedistributedlog/distributedlog:latest
+                resources:
+                  requests:
+                    memory: "3Gi"
+                    cpu: "1000m"
+                  limits:
+                    memory: "5Gi"
+                    cpu: "2000m"
+                command: [ "/bin/bash", "/opt/distributedlog/bin/entrypoint.sh" ]
+                args: [ "/opt/distributedlog/bin/dbench", "bkwrite" ]
+                envFrom:
+                  - configMapRef:
+                        name: benchmark-config
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+    name: benchmark-writer
+    labels:
+        app: bookkeeper
+        component: benchmark-writer
+spec:
+    clusterIP: None
+    selector:
+        app: bookkeeper
+        component: benchmark-writer
diff --git a/deploy/kubernetes/gke/bookkeeper.yaml b/deploy/kubernetes/gke/bookkeeper.yaml
new file mode 100644
index 0000000..fc2c061
--- /dev/null
+++ b/deploy/kubernetes/gke/bookkeeper.yaml
@@ -0,0 +1,156 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+## A reference of https://github.com/apache/incubator-pulsar/blob/master/kubernetes/google-container-engine/bookie.yaml
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+    name: bookie-config
+data:
+    BK_BOOKIE_EXTRA_OPTS: "\"-Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -XX:+UseG1GC  -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB\""
+    BK_bookiePort: "3181"
+    BK_journalDirectory: "/bookkeeper/data/journal"
+    BK_ledgerDirectories: "/bookkeeper/data/ledgers"
+    BK_indexDirectories: "/bookkeeper/data/ledgers"
+    BK_zkServers: zookeeper
+    # the default manager is flat, which is not good for supporting large number of ledgers
+    BK_ledgerManagerType: "hierarchical"
+    BK_statsProviderClass: org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
+    BK_DLOG_ROOT_LOGGER: "INFO,stderr"
+---
+
+## BookKeeper servers need to access the local disks and the pods
+## cannot be moved across different nodes.
+## For this reason, we run BK as a daemon set, one for each node in the
+## cluster, unless restricted by label selectors
+apiVersion: extensions/v1beta1
+kind: DaemonSet
+metadata:
+    name: bookie
+    labels:
+        app: bookkeeper
+        component: bookie
+spec:
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: bookie
+                # Specify cluster to allow aggregation by cluster in
+                # the metrics
+                cluster: bookkeeper-gke
+            annotations:
+                prometheus.io/scrape: "true"
+                prometheus.io/port: "8000"
+
+        spec:
+            containers:
+              - name: bookie
+                image: apachedistributedlog/distributedlog:latest
+                resources:
+                  requests:
+                    memory: "3Gi"
+                    cpu: "1000m"
+                  limits:
+                    memory: "5Gi"
+                    cpu: "2000m"
+                # 
+                command: [ "/bin/bash", "/opt/distributedlog/bin/entrypoint.sh" ]
+                args: [ "/opt/distributedlog/bin/dlog", "org.apache.bookkeeper.proto.BookieServer", "--conf", "/opt/bookkeeper/conf/bk_server.conf" ]
+                ports:
+                  - name: client
+                    containerPort: 3181
+                    # we are using `status.hostIP` for the bookie's advertised address. export 3181 as the hostPort,
+                    # so that the containers are able to access the host port
+                    hostPort: 3181
+                envFrom:
+                  - configMapRef:
+                        name: bookie-config
+                env:
+                  - name: BK_advertisedAddress
+                    valueFrom:
+                      fieldRef:
+                        fieldPath: status.hostIP
+                        
+
+                volumeMounts:
+                  - name: journal-disk
+                    mountPath: /bookkeeper/data/journal
+                  - name: ledgers-disk
+                    mountPath: /bookkeeper/data/ledgers
+
+            volumes:
+                # Mount local disks
+              - name: journal-disk
+                hostPath:
+                    path: /mnt/disks/ssd0
+              - name: ledgers-disk
+                hostPath:
+                    path: /mnt/disks/ssd1
+
+---
+
+##
+## Define the Bookie headless service
+## In practice, in this case, it is only useful to have a view of
+## all the bookie pods that are present
+##
+apiVersion: v1
+kind: Service
+metadata:
+    name: bookie
+    labels:
+        app: bookkeeper
+        component: bookie
+spec:
+    ports:
+      - port: 3181
+        name: server
+    clusterIP: None
+    selector:
+        app: bookkeeper
+        component: bookie
+
+---
+##
+## Run BookKeeper auto-recovery from a different set of containers
+## Auto-Recovery makes sure to restore the replication factor when any bookie
+## crashes and it's not recovering on its own.
+##
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+    name: bookie-autorecovery
+spec:
+    replicas: 2
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: bookkeeper-replication
+        spec:
+            containers:
+              - name: replication-worker
+                image: apache/bookkeeper:latest
+                command: [ "/bin/bash", "/opt/bookkeeper/entrypoint.sh" ]
+                args: ["/opt/bookkeeper/bin/bookkeeper", "autorecovery"]
+                envFrom:
+                  - configMapRef:
+                        name: bookie-config
diff --git a/deploy/kubernetes/gke/monitoring.yaml b/deploy/kubernetes/gke/monitoring.yaml
new file mode 100644
index 0000000..f7a5f0b
--- /dev/null
+++ b/deploy/kubernetes/gke/monitoring.yaml
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+    name: prometheus-config
+data:
+    # Include prometheus configuration file, setup to monitor all the
+    # Kubernetes pods the the "scrape=true" annotation.
+    prometheus.yml: |
+        global:
+            scrape_interval: 15s
+        scrape_configs:
+          - job_name: 'prometheus'
+            static_configs:
+              - targets: ['localhost:9090']
+          - job_name: 'kubernetes-pods'
+            kubernetes_sd_configs:
+              - role: pod
+
+            relabel_configs:
+              - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
+                action: keep
+                regex: true
+              - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
+                action: replace
+                target_label: __metrics_path__
+                regex: (.+)
+              - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
+                action: replace
+                regex: ([^:]+)(?::\d+)?;(\d+)
+                replacement: $1:$2
+                target_label: __address__
+              - action: labelmap
+                regex: __meta_kubernetes_pod_label_(.+)
+              - source_labels: [__meta_kubernetes_namespace]
+                action: replace
+                target_label: kubernetes_namespace
+              - source_labels: [__meta_kubernetes_pod_label_component]
+                action: replace
+                target_label: job
+              - source_labels: [__meta_kubernetes_pod_name]
+                action: replace
+                target_label: kubernetes_pod_name
+
+---
+
+apiVersion: apps/v1beta1
+kind: StatefulSet
+metadata:
+    name: prometheus
+spec:
+    serviceName: prometheus
+    replicas: 1
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: prometheus
+        spec:
+            containers:
+              - name: prometheus
+                image: prom/prometheus:v1.7.2
+                volumeMounts:
+                  - name: config-volume
+                    mountPath: /etc/prometheus
+                  - name: data-volume
+                    mountPath: /prometheus
+                ports:
+                  - containerPort: 9090
+            volumes:
+              - name: config-volume
+                configMap:
+                    name: prometheus-config
+    volumeClaimTemplates:
+    - metadata:
+        name: data-volume
+      spec:
+        accessModes: [ "ReadWriteOnce" ]
+        resources:
+            requests:
+                storage: 5Gi
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+    name: prometheus
+    labels:
+        app: bookkeeper
+        component: prometheus
+spec:
+    ports:
+      - port: 9090
+        name: server
+    clusterIP: None
+    selector:
+        app: bookkeeper
+        component: prometheus
+
+
+---
+## GRAFANA
+
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+    name: grafana
+spec:
+    replicas: 1
+    template:
+        metadata:
+            labels:
+                app: bookkeeper
+                component: grafana
+        spec:
+            containers:
+              - name: grafana
+                image: apachedistributedlog/distributedlog-grafana:latest
+                ports:
+                  - containerPort: 9090
+                env:
+                  - name: PROMETHEUS_URL
+                    value: http://prometheus:9090/
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+    name: grafana
+    labels:
+        app: bookkeeper
+        component: grafana
+spec:
+    ports:
+      - port: 3000
+        name: server
+    clusterIP: None
+    selector:
+        app: bookkeeper
+        component: grafana
diff --git a/deploy/kubernetes/gke/zookeeper.yaml b/deploy/kubernetes/gke/zookeeper.yaml
new file mode 100644
index 0000000..12fdfa6
--- /dev/null
+++ b/deploy/kubernetes/gke/zookeeper.yaml
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+## A reference of https://github.com/kubernetes/contrib/blob/master/statefulsets/zookeeper/zookeeper.yaml
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: zookeeper
+  labels:
+    app: bookkeeper
+    component: zookeeper
+spec:
+  ports:
+  - port: 2888
+    name: server
+  - port: 3888
+    name: leader-election
+  clusterIP: None
+  selector:
+    app: bookkeeper
+    component: zookeeper
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: zk-cm
+data:
+  jvm.heap: "1G"
+  tick: "2000"
+  init: "10"
+  sync: "5"
+  client.cnxns: "60"
+  snap.retain: "3"
+  purge.interval: "0"
+---
+apiVersion: policy/v1beta1
+kind: PodDisruptionBudget
+metadata:
+  name: zk-pdb
+spec:
+  selector:
+    matchLabels:
+      app: zk
+  minAvailable: 2
+---
+apiVersion: apps/v1beta1
+kind: StatefulSet
+metadata:
+  name: zk
+  labels:
+    app: bookkeeper
+    component: zookeeper
+spec:
+  serviceName: zookeeper
+  replicas: 3
+  template:
+    metadata:
+      labels:
+        app: bookkeeper
+        component: zookeeper
+        cluster: bookkeeper-gke
+      annotations:
+        pod.alpha.kubernetes.io/initialized: "true"
+        prometheus.io/scrape: "true"
+        prometheus.io/port: "8080"
+
+    spec:
+      affinity:
+        podAntiAffinity:
+          requiredDuringSchedulingIgnoredDuringExecution:
+            - labelSelector:
+                matchExpressions:
+                  - key: "app"
+                    operator: In
+                    values: 
+                      - zookeeper
+              topologyKey: "kubernetes.io/hostname"
+      containers:
+      - name: k8szk
+        imagePullPolicy: Always
+        image: gcr.io/google_samples/k8szk:v2
+        resources:
+          requests:
+            memory: "1Gi"
+            cpu: "500m"
+        ports:
+        - containerPort: 2181
+          name: client
+        - containerPort: 2888
+          name: server
+        - containerPort: 3888
+          name: leader-election
+        env:
+        - name : ZK_REPLICAS
+          value: "3"
+        - name : ZK_HEAP_SIZE
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: jvm.heap
+        - name : ZK_TICK_TIME
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: tick
+        - name : ZK_INIT_LIMIT
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: init
+        - name : ZK_SYNC_LIMIT
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: tick
+        - name : ZK_MAX_CLIENT_CNXNS
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: client.cnxns
+        - name: ZK_SNAP_RETAIN_COUNT
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: snap.retain
+        - name: ZK_PURGE_INTERVAL
+          valueFrom:
+            configMapKeyRef:
+                name: zk-cm
+                key: purge.interval
+        - name: ZK_CLIENT_PORT
+          value: "2181"
+        - name: ZK_SERVER_PORT
+          value: "2888"
+        - name: ZK_ELECTION_PORT
+          value: "3888"
+        command:
+        - sh
+        - -c
+        - zkGenConfig.sh && zkServer.sh start-foreground
+        readinessProbe:
+          exec:
+            command:
+            - "zkOk.sh"
+          initialDelaySeconds: 10
+          timeoutSeconds: 5
+        livenessProbe:
+          exec:
+            command:
+            - "zkOk.sh"
+          initialDelaySeconds: 10
+          timeoutSeconds: 5
+        volumeMounts:
+        - name: datadir
+          mountPath: /var/lib/zookeeper
+      securityContext:
+        runAsUser: 1000
+        fsGroup: 1000
+  volumeClaimTemplates:
+  - metadata:
+      name: datadir
+    spec:
+      accessModes: [ "ReadWriteOnce" ]
+      resources:
+        requests:
+          storage: 5Gi
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
index d877812..b9f3a72 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
@@ -26,16 +26,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import java.util.concurrent.CountDownLatch;
-
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -43,10 +41,8 @@
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
-
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -54,8 +50,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * LedgerAllocator impl.
  */
@@ -203,7 +197,7 @@
                     return;
                 }
                 Versioned<byte[]> allocatorData =
-                        new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+                        new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
                 SimpleLedgerAllocator allocator =
                         new SimpleLedgerAllocator(path, allocatorData, quorumConfigProvider, zkc, bkc);
                 allocator.start();
@@ -262,7 +256,7 @@
                     SimpleLedgerAllocator newAllocator = null;
                     if (KeeperException.Code.OK.intValue() == rc) {
                         Versioned<byte[]> allocatorData =
-                                new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+                                new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
                         logger.info("Rescuing ledger allocator {}.", path);
                         newAllocator = new SimpleLedgerAllocator(path, allocatorData, quorumConfigProvider, zkc, bkc);
                         newAllocator.start();
@@ -448,6 +442,6 @@
             allocatorsToDelete,
             allocator -> allocator.delete(),
             scheduledExecutorService
-        ).thenCompose(values -> Utils.zkDelete(zkc, poolPath, new ZkVersion(-1)));
+        ).thenCompose(values -> Utils.zkDelete(zkc, poolPath, new LongVersion(-1)));
     }
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index fbdc3dd..d87f557 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -25,31 +25,26 @@
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
-
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.DLUtils;
-
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
-
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * Allocator to allocate ledgers.
  */
@@ -96,7 +91,7 @@
     // allocation phase
     Phase phase = Phase.HANDED_OVER;
     // version
-    ZkVersion version = new ZkVersion(-1);
+    LongVersion version = new LongVersion(-1);
     // outstanding allocation
     CompletableFuture<LedgerHandle> allocatePromise;
     // outstanding tryObtain transaction
@@ -139,7 +134,7 @@
                         public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
                             if (KeeperException.Code.OK.intValue() == rc) {
                                 promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
-                                        new ZkVersion(stat.getVersion())));
+                                        new LongVersion(stat.getVersion())));
                             } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                                 FutureUtils.proxyTo(
                                   Utils.zkGetData(zkc, allocatePath, false),
@@ -207,7 +202,7 @@
      *          Allocation Data.
      */
     private void initialize(Versioned<byte[]> allocationData) {
-        setVersion((ZkVersion) allocationData.getVersion());
+        setVersion((LongVersion) allocationData.getVersion());
         byte[] data = allocationData.getValue();
         if (null != data && data.length > 0) {
             // delete the allocated ledger since this is left by last allocation.
@@ -261,10 +256,10 @@
 
     @Override
     public void onCommit(Version r) {
-        confirmObtain((ZkVersion) r);
+        confirmObtain((LongVersion) r);
     }
 
-    private void confirmObtain(ZkVersion zkVersion) {
+    private void confirmObtain(LongVersion zkVersion) {
         boolean shouldAllocate = false;
         OpListener<LedgerHandle> listenerToNotify = null;
         LedgerHandle lhToNotify = null;
@@ -344,7 +339,7 @@
             return;
         }
         org.apache.zookeeper.Op zkSetDataOp = org.apache.zookeeper.Op.setData(
-                allocatePath, DistributedLogConstants.EMPTY_BYTES, version.getZnodeVersion());
+                allocatePath, DistributedLogConstants.EMPTY_BYTES, (int) version.getLongVersion());
         ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this);
         tryObtainTxn.addOp(commitOp);
         setPhase(Phase.HANDING_OVER);
@@ -368,11 +363,11 @@
         failAllocation(cause);
     }
 
-    private synchronized ZkVersion getVersion() {
+    private synchronized LongVersion getVersion() {
         return version;
     }
 
-    private synchronized void setVersion(ZkVersion newVersion) {
+    private synchronized void setVersion(LongVersion newVersion) {
         Version.Occurred occurred = newVersion.compare(version);
         if (occurred == Version.Occurred.AFTER) {
             LOG.info("Ledger allocator for {} moved version from {} to {}.",
@@ -387,9 +382,9 @@
     private void markAsAllocated(final LedgerHandle lh) {
         byte[] data = DLUtils.logSegmentId2Bytes(lh.getId());
         Utils.zkSetData(zkc, allocatePath, data, getVersion())
-            .whenComplete(new FutureEventListener<ZkVersion>() {
+            .whenComplete(new FutureEventListener<LongVersion>() {
                 @Override
-                public void onSuccess(ZkVersion version) {
+                public void onSuccess(LongVersion version) {
                     // we only issue deleting ledger left from previous allocation when we could allocate first ledger
                     // as zookeeper version could prevent us doing stupid things.
                     deleteLedgerLeftFromPreviousAllocationIfNecessary();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index d11fa02..b57a027 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -20,7 +20,6 @@
 import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.collect.ImmutableList;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -32,7 +31,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -55,7 +54,6 @@
 import org.apache.distributedlog.zk.ZKOp;
 import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
-
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -66,8 +64,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * ZooKeeper based log segment metadata store.
  */
@@ -229,10 +225,10 @@
                                                  Versioned<Long> lssn,
                                                  Transaction.OpListener<Version> listener) {
         Version version = lssn.getVersion();
-        assert(version instanceof ZkVersion);
-        ZkVersion zkVersion = (ZkVersion) version;
+        assert(version instanceof LongVersion);
+        LongVersion zkVersion = (LongVersion) version;
         byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
-        Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, (int) zkVersion.getLongVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
@@ -243,10 +239,10 @@
                               Versioned<Long> transactionId,
                               Transaction.OpListener<Version> listener) {
         Version version = transactionId.getVersion();
-        assert(version instanceof ZkVersion);
-        ZkVersion zkVersion = (ZkVersion) version;
+        assert(version instanceof LongVersion);
+        LongVersion zkVersion = (LongVersion) version;
         byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
-        Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, (int) zkVersion.getLongVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
@@ -375,7 +371,7 @@
         CompletableFuture<Versioned<List<String>>> result = ((CompletableFuture<Versioned<List<String>>>) ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
             /** cversion: the number of changes to the children of this znode **/
-            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+            LongVersion zkVersion = new LongVersion(stat.getCversion());
             result.complete(new Versioned(children, zkVersion));
         } else if (KeeperException.Code.NONODE.intValue() == rc) {
             result.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 04db205..3c55edc 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -22,7 +22,6 @@
 import static org.apache.distributedlog.metadata.LogMetadata.*;
 
 import com.google.common.base.Optional;
-
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
@@ -30,15 +29,13 @@
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import java.util.function.Function;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.ZooKeeperClient;
-
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.common.util.SchedulerUtils;
@@ -59,16 +56,12 @@
 import org.apache.distributedlog.metadata.LogMetadataForReader;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-
-
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.LimitedPermitManager;
 import org.apache.distributedlog.zk.ZKTransaction;
-
-
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -82,9 +75,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
 /**
  * zookeeper based {@link LogStreamMetadataStore}.
  */
@@ -458,7 +448,7 @@
                         if (null == dataCreated) {
                             finalMetadatas.add(metadatas.get(i));
                         } else {
-                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
+                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new LongVersion(0)));
                         }
                     }
                     promise.complete(finalMetadatas);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
index 89c3e12..ad4d7b3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
@@ -25,7 +25,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -33,7 +33,6 @@
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
@@ -137,7 +136,7 @@
     public CompletableFuture<Boolean> deleteSubscriber(String subscriberId) {
         subscribers.remove(subscriberId);
         String path = getSubscriberZKPath(subscriberId);
-        return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
+        return Utils.zkDeleteIfNotExist(zkc, path, new LongVersion(-1L));
     }
 
     @Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
index 76f7978..8b18e02 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
@@ -31,10 +31,9 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -340,7 +339,7 @@
                     if (null == stat) {
                         promise.complete(new Versioned<byte[]>(null, null));
                     } else {
-                        promise.complete(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())));
+                        promise.complete(new Versioned<byte[]>(data, new LongVersion(stat.getVersion())));
                     }
                 } else if (KeeperException.Code.NONODE.intValue() == rc) {
                     promise.complete(new Versioned<byte[]>(null, null));
@@ -352,8 +351,8 @@
         return promise;
     }
 
-    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeperClient zkc,
-                                                         String path, byte[] data, ZkVersion version) {
+    public static CompletableFuture<LongVersion> zkSetData(ZooKeeperClient zkc,
+                                                         String path, byte[] data, LongVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
@@ -378,13 +377,14 @@
      *          version used to set data
      * @return future representing the version after this operation.
      */
-    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) {
-        final CompletableFuture<ZkVersion> promise = new CompletableFuture<ZkVersion>();
-        zk.setData(path, data, version.getZnodeVersion(), new AsyncCallback.StatCallback() {
+    public static CompletableFuture<LongVersion> zkSetData(
+            ZooKeeper zk, String path, byte[] data, LongVersion version) {
+        final CompletableFuture<LongVersion> promise = new CompletableFuture<LongVersion>();
+        zk.setData(path, data, (int) version.getLongVersion(), new AsyncCallback.StatCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, Stat stat) {
                 if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.complete(new ZkVersion(stat.getVersion()));
+                    promise.complete(new LongVersion(stat.getVersion()));
                     return;
                 }
                 promise.completeExceptionally(
@@ -395,7 +395,7 @@
         return promise;
     }
 
-    public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) {
+    public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String path, LongVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
@@ -418,9 +418,9 @@
      *          version used to set data
      * @return future representing the version after this operation.
      */
-    public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) {
+    public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, LongVersion version) {
         final CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
+        zk.delete(path, (int) version.getLongVersion(), new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (KeeperException.Code.OK.intValue() == rc) {
@@ -448,7 +448,7 @@
      * false if the node doesn't exist, otherwise future will throw exception
      *
      */
-    public static CompletableFuture<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
+    public static CompletableFuture<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, LongVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
@@ -458,7 +458,7 @@
             return FutureUtils.exception(zkException(e, path));
         }
         final CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
-        zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
+        zk.delete(path, (int) version.getLongVersion(), new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (KeeperException.Code.OK.intValue() == rc) {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKVersionedSetOp.java
index 947b112..64224ad 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKVersionedSetOp.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKVersionedSetOp.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.zk;
 
 import javax.annotation.Nullable;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.zookeeper.KeeperException;
@@ -44,7 +44,7 @@
         assert(opResult instanceof OpResult.SetDataResult);
         OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
         if (null != listener) {
-            listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+            listener.onCommit(new LongVersion(setDataResult.getStat().getVersion()));
         }
     }
 
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
index 4f52cb2..c0eecdf 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
@@ -84,7 +84,7 @@
     @Test(timeout = 20000)
     public void loadStreamConfNullOverrides() throws Exception {
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        DistributedLogConfiguration confClone = (DistributedLogConfiguration) conf.clone();
+        DistributedLogConfiguration confClone = new DistributedLogConfiguration();
         Optional<DistributedLogConfiguration> streamConfiguration = Optional.absent();
         conf.loadStreamConf(streamConfiguration);
 
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
index e76ee6a..cfaa406 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
@@ -17,14 +17,14 @@
  */
 package org.apache.distributedlog;
 
-
 import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -40,9 +40,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
 /**
  * Test Cases for LogSegmentsZK.
  */
@@ -56,7 +53,7 @@
         String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                 uri, streamName, conf.getUnpartitionedStreamName());
         byte[] data = zkc.get().getData(logSegmentsPath, false, stat);
-        Versioned<byte[]> maxLSSNData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+        Versioned<byte[]> maxLSSNData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
         return new MaxLogSegmentSequenceNo(maxLSSNData);
     }
 
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index f146c1e..0270a4a 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -21,6 +21,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.net.URI;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -30,26 +31,22 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
-
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.bk.SimpleLedgerAllocator.AllocationException;
 import org.apache.distributedlog.bk.SimpleLedgerAllocator.Phase;
 import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
-
-
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.DefaultZKOp;
 import org.apache.distributedlog.zk.ZKTransaction;
-
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -64,9 +61,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
 /**
  * TestLedgerAllocator.
  */
@@ -174,7 +168,7 @@
         zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(allocationPath, false, stat);
-        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
 
         SimpleLedgerAllocator allocator1 =
                 new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
@@ -241,7 +235,7 @@
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(allocationPath, false, stat);
 
-        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
 
         SimpleLedgerAllocator allocator1 =
                 new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
@@ -253,7 +247,7 @@
         // Second allocator kicks in
         stat = new Stat();
         data = zkc.get().getData(allocationPath, false, stat);
-        allocationData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
+        allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
         SimpleLedgerAllocator allocator2 =
                 new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
         allocator2.allocate();
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index 6e1d0b8..613787f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -17,8 +17,14 @@
  */
 package org.apache.distributedlog.impl;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.Lists;
 import java.net.URI;
 import java.util.Collections;
@@ -27,7 +33,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
@@ -45,7 +51,6 @@
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
-
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -59,10 +64,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
-
 /**
  * Test ZK based log segment metadata store.
  */
@@ -637,7 +638,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxLogSegmentSequenceNumber() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(0));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
@@ -654,7 +655,7 @@
             }
         });
         Utils.ioResult(updateTxn.execute());
-        assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion());
+        assertEquals(1L, ((LongVersion) Utils.ioResult(result)).getLongVersion());
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
         assertEquals(999L, DLUtils.deserializeLogSegmentSequenceNumber(data));
@@ -664,7 +665,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxLogSegmentSequenceNumberBadVersion() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(10));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
@@ -701,7 +702,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxLogSegmentSequenceNumberOnNonExistentPath() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(10));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
         LogMetadata metadata = mock(LogMetadata.class);
@@ -735,7 +736,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxTxnId() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(0));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
@@ -752,7 +753,7 @@
             }
         });
         Utils.ioResult(updateTxn.execute());
-        assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion());
+        assertEquals(1L, ((LongVersion) Utils.ioResult(result)).getLongVersion());
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
         assertEquals(999L, DLUtils.deserializeTransactionId(data));
@@ -762,7 +763,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxTxnIdBadVersion() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(10));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
@@ -799,7 +800,7 @@
     @Test(timeout = 60000)
     public void testStoreMaxTxnIdOnNonExistentPath() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
-        Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
+        Versioned<Long> value = new Versioned<Long>(999L, new LongVersion(10));
         final CompletableFuture<Version> result = new CompletableFuture<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index cd894ae..f1cec9d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -19,12 +19,15 @@
 
 import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.apache.distributedlog.metadata.LogMetadata.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.collect.Lists;
 import java.net.URI;
 import java.util.List;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -38,10 +41,8 @@
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
-
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.Utils;
-
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Transaction;
@@ -54,10 +55,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
-
 /**
  * Test {@link ZKLogStreamMetadataStore}.
  */
@@ -188,7 +185,7 @@
 
         for (Versioned<byte[]> metadata : metadatas) {
             assertTrue(pathExists(metadata));
-            assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
+            assertTrue(((LongVersion) metadata.getVersion()).getLongVersion() >= 0L);
         }
 
         Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
index bc45b9c..58c0de6 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -17,23 +17,23 @@
  */
 package org.apache.distributedlog.impl.metadata;
 
+import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.intToBytes;
+import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.processLogMetadatas;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
-import static org.junit.Assert.*;
 import com.google.common.collect.Lists;
 import java.net.URI;
 import java.util.List;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-
 import org.junit.Test;
 
-
 /**
  * TestZKLogStreamMetadataStoreUtils.
  */
@@ -63,7 +63,7 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
@@ -78,7 +78,7 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(intToBytes(9999), null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
@@ -93,7 +93,7 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
@@ -109,9 +109,9 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
@@ -126,10 +126,10 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
@@ -144,11 +144,11 @@
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1)),
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new LongVersion(1)),
                 new Versioned<byte[]>(null, null));
         processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
     }
@@ -161,16 +161,16 @@
         String logName = "test-log";
         String logIdentifier = "<default>";
         Versioned<byte[]> maxTxnIdData =
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1));
         Versioned<byte[]> logSegmentsData =
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new LongVersion(1));
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
                 logSegmentsData);
         LogMetadataForWriter metadata =
                 processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
@@ -188,18 +188,18 @@
         String logName = "test-log";
         String logIdentifier = "<default>";
         Versioned<byte[]> maxTxnIdData =
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new LongVersion(1));
         Versioned<byte[]> logSegmentsData =
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new LongVersion(1));
         Versioned<byte[]> allocationData =
-                new Versioned<byte[]>(DLUtils.logSegmentId2Bytes(1L), new ZkVersion(1));
+                new Versioned<byte[]>(DLUtils.logSegmentId2Bytes(1L), new LongVersion(1));
         List<Versioned<byte[]>> metadatas = Lists.newArrayList(
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
                 new Versioned<byte[]>(intToBytes(LogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
+                new Versioned<byte[]>(new byte[0], new LongVersion(1)),
                 logSegmentsData,
                 allocationData);
         LogMetadataForWriter metadata =
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
index 752bc35..de5da8c 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
@@ -18,10 +18,14 @@
 package org.apache.distributedlog.util;
 
 import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 import com.google.common.base.Optional;
 import java.util.concurrent.CountDownLatch;
-import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
@@ -34,9 +38,6 @@
 import org.junit.Before;
 import org.junit.Test;
 
-
-
-
 /**
  * Test Utils.
  */
@@ -121,7 +122,7 @@
         assertArrayEquals("Data should return as written",
                 rawData, data.getValue());
         assertEquals("Version should be zero",
-                0, ((ZkVersion) data.getVersion()).getZnodeVersion());
+                0L, ((LongVersion) data.getVersion()).getLongVersion());
     }
 
     @Test(timeout = 60000)
diff --git a/docker/build.sh b/docker/build.sh
index 879fbdb..d957471 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -25,7 +25,7 @@
 ALL_MODULE="distributedlog-dist"
 DOCKER_IMAGE_NAME="distributedlog"
 DOCKER_GRAFANA_NAME="distributedlog-grafana"
-DOCKER_IMAGE_VERSION="nightly"
+DOCKER_IMAGE_VERSION="latest"
 
 echo "distributedlog version: ${MVN_VERSION}"
 
diff --git a/docker/grafana/dashboards/jvm.json b/docker/grafana/dashboards/jvm.json
index 7f6d422..57649b4 100644
--- a/docker/grafana/dashboards/jvm.json
+++ b/docker/grafana/dashboards/jvm.json
@@ -2550,6 +2550,6 @@
     ]
   },
   "timezone": "browser",
-  "title": "Pulsar - JVM",
+  "title": "JVM",
   "version": 27
 }
diff --git a/docker/publish-grafana.sh b/docker/publish-grafana.sh
index 38f0dfa..83ee037 100755
--- a/docker/publish-grafana.sh
+++ b/docker/publish-grafana.sh
@@ -47,7 +47,7 @@
 # Fail if any of the subsequent commands fail
 set -e
 
-docker tag distributedlog-grafana:nightly $DOCKER_ORG/distributedlog-grafana:nightly
+docker tag distributedlog-grafana:latest $DOCKER_ORG/distributedlog-grafana:latest
 
 # Push all images and tags
-docker push $DOCKER_ORG/distributedlog-grafana:nightly
+docker push $DOCKER_ORG/distributedlog-grafana:latest
diff --git a/docker/publish.sh b/docker/publish.sh
index 6aebe09..617ccae 100755
--- a/docker/publish.sh
+++ b/docker/publish.sh
@@ -47,9 +47,9 @@
 # Fail if any of the subsequent commands fail
 set -e
 
-docker tag distributedlog:nightly $DOCKER_ORG/distributedlog:nightly
-# docker tag distributedlog:nightly $DOCKER_ORG/distributedlog:$MVN_VERSION
+docker tag distributedlog:latest $DOCKER_ORG/distributedlog:latest
+# docker tag distributedlog:latest $DOCKER_ORG/distributedlog:$MVN_VERSION
 
 # Push all images and tags
-docker push $DOCKER_ORG/distributedlog:nightly
+docker push $DOCKER_ORG/distributedlog:latest
 # docker push $DOCKER_ORG/distributedlog:$MVN_VERSION
diff --git a/pom.xml b/pom.xml
index 2db3f7b..550215e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <!-- dependencies -->
-    <bookkeeper.version>4.5.0</bookkeeper.version>
+    <bookkeeper.version>4.6.0-SNAPSHOT</bookkeeper.version>
     <codahale.metrics.version>3.0.1</codahale.metrics.version>
     <commons-cli.version>1.1</commons-cli.version>
     <commons-codec.version>1.6</commons-codec.version>
diff --git a/website/docs/latest/deployment/docker.rst b/website/docs/latest/deployment/docker.rst
deleted file mode 100644
index d9fd87a..0000000
--- a/website/docs/latest/deployment/docker.rst
+++ /dev/null
@@ -1,49 +0,0 @@
----
-title: Docker
-top-nav-group: deployment
-top-nav-pos: 2
-top-nav-title: Docker
-layout: default
----
-
-.. contents:: This page provides instructions on how to deploy **DistributedLog** using docker.
-
-Docker Setup
-============
-
-Prerequesites
--------------
-1. Docker
-
-Steps
------
-1. Create a snapshot using
-
-.. code-block:: bash
-
-    ./scripts/snapshot
-
-
-2. Create your own docker image using
-
-.. code-block:: bash
-
-    docker build -t <your image name> .
-
-
-3. You can run the docker container using
-
-.. code-block:: bash
-
-    docker run -e ZK_SERVERS=<zk server list> -e DEPLOY_BK=<true|false> -e DEPLOY_WP=<true|false> <your image name>
-
-
-Environment variables
----------------------
-
-Following are the environment variables which can change how the docker container runs.
-
-1. **ZK_SERVERS**: ZK servers running exernally (the container does not run a zookeeper)
-2. **DEPLOY_BOTH**: Deploys writeproxies as well as the bookies
-3. **DEPLOY_WP**: Flag to notify that a writeproxy needs to be deployed
-4. **DEPLOY_BK**: Flag to notify that a bookie needs to be deployed
diff --git a/website/docs/latest/deployment/kubernetes.md b/website/docs/latest/deployment/kubernetes.md
new file mode 100644
index 0000000..a04229a
--- /dev/null
+++ b/website/docs/latest/deployment/kubernetes.md
@@ -0,0 +1,241 @@
+---
+title: Docker
+top-nav-group: deployment
+top-nav-pos: 3
+top-nav-title: Kubernetes
+layout: default
+---
+
+Apache DistributedLog can be easily deployed in [Kubernetes](https://kubernetes.io/) clusters. The managed clusters on [Google Container Engine](https://cloud.google.com/compute/) is the most convenient way.
+
+The deployment method shown in this guide relies on [YAML](http://yaml.org/) definitions for Kubernetes [resources](https://kubernetes.io/docs/resources-reference/v1.6/). The [`kubernetes`](https://github.com/apache/distributedlog/tree/master/deploy/kubernetes) subdirectory holds resource definitions for:
+
+* A three-node ZooKeeper cluster
+* A BookKeeper cluster with a bookie runs on each node.
+* A three-node Proxy cluster.
+
+If you already have setup a BookKeeper cluster following the instructions of [Deploying Apache BookKeeper on Kubernetes](http://bookkeeper.apache.org/docs/latest/deployment/kubernetes/) in Apache BookKeeper website,
+you can skip deploying bookkeeper and start from [Create a DistributedLog Namespace](#create_namespace).
+
+## Setup on Google Container Engine
+
+To get started, get source code of [`kubernetes`](https://github.com/apache/distributedlog/tree/master/deploy/kubernetes) yaml definitions from github by git clone.
+
+If you'd like to change the number of bookies, ZooKeeper nodes, or proxy nodes in your deployment, modify the `replicas` parameter in the `spec` section of the appropriate [`Deployment`](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) or [`StatefulSet`](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) resource.
+
+[Google Container Engine](https://cloud.google.com/container-engine) (GKE) automates the creation and management of Kubernetes clusters in [Google Compute Engine](https://cloud.google.com/compute/) (GCE).
+
+### Prerequisites
+
+To get started, you'll need:
+
+* A Google Cloud Platform account, which you can sign up for at [cloud.google.com](https://cloud.google.com)
+* An existing Cloud Platform project
+* The [Google Cloud SDK](https://cloud.google.com/sdk/downloads) (in particular the [`gcloud`](https://cloud.google.com/sdk/gcloud/) and [`kubectl`]() tools).
+
+### Create a new Kubernetes cluster
+
+You can create a new GKE cluster using the [`container clusters create`](https://cloud.google.com/sdk/gcloud/reference/container/clusters/create) command for `gcloud`. This command enables you to specify the number of nodes in the cluster, the machine types of those nodes, and more.
+
+As an example, we'll create a new GKE cluster for Kubernetes version [1.7.5](https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG.md#v175) in the [us-central1-a](https://cloud.google.com/compute/docs/regions-zones/regions-zones#available) zone. The cluster will be named `bookkeeper-gke-cluster` and will consist of three VMs, each using two locally attached SSDs and running on [n1-standard-8](https://cloud.google.com/compute/docs/machine-types) machines. These SSDs will be used by Bookie instances, one for the BookKeeper journal and the other for storing the actual data.
+
+```bash
+$ gcloud config set compute/zone us-central1-a
+$ gcloud config set project your-project-name
+$ gcloud container clusters create bookkeeper-gke-cluster \
+  --machine-type=n1-standard-8 \
+  --num-nodes=3 \
+  --local-ssd-count=2 \
+  --cluster-version=1.7.5
+```
+
+By default, bookies will run on all the machines that have locally attached SSD disks. In this example, all of those machines will have two SSDs, but you can add different types of machines to the cluster later. You can control which machines host bookie servers using [labels](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels).
+
+### Dashboard
+
+You can observe your cluster in the [Kubernetes Dashboard](https://kubernetes.io/docs/tasks/access-application-cluster/web-ui-dashboard/) by downloading the credentials for your Kubernetes cluster and opening up a proxy to the cluster:
+
+```bash
+$ gcloud container clusters get-credentials bookkeeper-gke-cluster \
+  --zone=us-central1-a \
+  --project=your-project-name
+$ kubectl proxy
+```
+
+By default, the proxy will be opened on port 8001. Now you can navigate to [localhost:8001/ui](http://localhost:8001/ui) in your browser to access the dashboard. At first your GKE cluster will be empty, but that will change as you begin deploying.
+
+When you create a cluster, your `kubectl` config in `~/.kube/config` (on MacOS and Linux) will be updated for you, so you probably won't need to change your configuration. Nonetheless, you can ensure that `kubectl` can interact with your cluster by listing the nodes in the cluster:
+
+```bash
+$ kubectl get nodes
+```
+
+If `kubectl` is working with your cluster, you can proceed to deploy ZooKeeper and Bookies.
+
+### ZooKeeper
+
+You *must* deploy ZooKeeper as the first component, as it is a dependency for the others.
+
+```bash
+$ kubectl apply -f zookeeper.yaml
+```
+
+Wait until all three ZooKeeper server pods are up and have the status `Running`. You can check on the status of the ZooKeeper pods at any time:
+
+```bash
+$ kubectl get pods -l component=zookeeper
+NAME      READY     STATUS             RESTARTS   AGE
+zk-0      1/1       Running            0          18m
+zk-1      1/1       Running            0          17m
+zk-2      0/1       Running            6          15m
+```
+
+This step may take several minutes, as Kubernetes needs to download the Docker image on the VMs.
+
+
+If you want to connect to one of the remote zookeeper server, you can use[zk-shell](https://github.com/rgs1/zk_shell), you need to forward a local port to the
+remote zookeeper server:
+
+```bash
+$ kubectl port-forward zk-0 2181:2181
+$ zk-shell localhost 2181
+```
+
+### Deploy Bookies
+
+Once ZooKeeper cluster is Running, you can then deploy the bookies.
+
+```bash
+$ kubectl apply -f bookkeeper.yaml
+```
+
+You can check on the status of the Bookie pods for these components either in the Kubernetes Dashboard or using `kubectl`:
+
+```bash
+$ kubectl get pods
+```
+
+While all BookKeeper pods is Running, by zk-shell you could find all available bookies under /ledgers/
+
+You can also verify the deployment by ssh to a bookie pod.
+
+```bash
+$ kubectl exec -it <pod_name> -- bash
+```
+
+On the bookie pod, you can run simpletest to verify the installation. The simpletest will create a ledger and append a few entries into the ledger.
+
+```bash
+$ BOOKIE_CONF=/opt/bookkeeper/conf/bk_server.conf /opt/distributedlog/bin/dlog bkshell simpletest
+```
+
+### Monitoring
+
+Apache BookKeeper provides stats provider for being able to integrate with different monitoring systems. The default monitoring stack for Apache BookKeeper
+on Kubernetes has consists of [Prometheus](https://prometheus.io/) and [Grafana](https://grafana.com/).
+
+You can deploy one instance of Prometheus and one instance of Grafana by running following command:
+
+```bash
+$ kubectl apply -f monitoring.yaml
+```
+
+#### Prometheus
+
+All BookKeeper/DistributedLog metrics in Kubernetes are collected by a Prometheus instance running inside the cluster. Typically, there is no need to access
+Prometheus directly. Instead, you can use the Grafana interface that displays the data stored in Prometheus.
+
+#### Grafana
+
+In your Kubernetes cluster, you can use Grafana to view dashbaords for JVM stats, ZooKeeper, and BookKeeper. You can get access to the pod serving Grafana
+using kubectl’s port-forward command:
+
+```bash
+$ kubectl port-forward $(kubectl get pods | grep grafana | awk '{print $1}') 3000
+```
+
+You can then access the dashboard in your web browser at [localhost:3000](http://localhost:3000).
+
+### Create DistributedLog Namespace
+
+At this moment, you have a bookkeeper cluster up running on kubernetes. Now, You can create a distributedlog namespace and start playing with it.
+If you setup the bookkeeper cluster following the above instructions, it uses `apachedistributedlog/distributedlog:0.5.0` image for running bookies.
+You can skip creating distributedlog namespace here and move to next section. Because it already created a default
+namespace `distributedlog://zookeeper/distributedlog` for you when starting the bookies.
+
+You can create a distributedlog namespace using the `dlog` tool.
+
+```bash
+$ kubectl run dlog --rm=true --attach --image=apachedistributedlog/distributedlog:0.5.0 --restart=OnFailure -- /opt/distributedlog/bin/dlog admin bind -l /bookkeeper/ledgers -s zookeeper -c distributedlog://zookeeper/distributedlog
+```
+
+After you have a distributedlog namespace, you can play around the namespace by using `dlog` tool to create, delete, list and show the streams.
+
+#### Create Streams
+
+Create 10 streams prefixed with `mystream-`.
+
+```bash
+$ kubectl run dlog --rm=true --attach --image=apachedistributedlog/distributedlog:0.5.0 --restart=OnFailure -- /opt/distributedlog/bin/dlog tool create -u distributedlog://zookeeper/distributedlog -r mystream- -e 0-9 -f
+```
+
+#### List Streams
+
+List the streams under the namespace.
+
+```bash
+$ kubectl run dlog --rm=true --attach --image=apachedistributedlog/distributedlog:0.5.0 --restart=OnFailure -- /opt/distributedlog/bin/dlog tool list -u distributedlog://zookeeper/distributedlog
+```
+
+An example of the output of this command is:
+
+```
+Streams under distributedlog://zookeeper/distributedlog :
+--------------------------------
+mystream-0
+mystream-9
+mystream-6
+mystream-5
+mystream-8
+mystream-7
+mystream-2
+mystream-1
+mystream-4
+mystream-3
+--------------------------------
+```
+
+### Write and Read Records
+
+You can run a simple benchmark on testing writing and read from distributedlog streams.
+
+Start one instance of benchmark-writer to write to 100 streams. (The streams are automatically created by the benchmark writer)
+
+```bash
+$ kubectl apply -f benchmark-writer.yaml
+```
+
+Start one instance of benchmark-reader to read from those 100 streams.
+
+```bash
+$ kubectl apply -f benchmark-reader.yaml
+```
+
+You can monitor the Grafana dashboard for the traffic comes from benchmark writer and reader.
+
+### Un-Deploy
+
+Delete BookKeeper
+```bash
+$ kubectl delete -f bookkeeper.yaml    
+```
+
+Delete ZooKeeper
+```bash
+$ kubectl delete -f zookeeper.yaml    
+```
+
+Delete cluster
+```bash
+$ gcloud container clusters delete bookkeeper-gke-cluster    
+```
diff --git a/website/docs/latest/index.md b/website/docs/latest/index.md
index dba7bba..8d1c492 100644
--- a/website/docs/latest/index.md
+++ b/website/docs/latest/index.md
@@ -36,7 +36,7 @@
 
 - **Quickstarts**: [Run DistributedLog]({{ site.baseurl }}/start/quickstart) on your local machine or follow the tutorial to [write a simple program]({{ site.baseurl }}/tutorials/basic-1) to interact with _DistributedLog_.
 
-- **Setup**: The [docker]({{ site.baseurl }}/deployment/docker) and [cluster]({{ site.baseurl }}/deployment/cluster) setup guides show how to deploy DistributedLog Stack.
+- **Setup**: The [kubernetes]({{ site.baseurl }}/deployment/kubernetes) and [cluster]({{ site.baseurl }}/deployment/cluster) setup guides show how to deploy DistributedLog Stack.
 
 - **Programming Guide**: You can check out our guides about [basic concepts]({{ site.baseurl }}/basics/introduction) and the [Core Library API]({{ site.baseurl }}/user_guide/api/core) or [Proxy Client API]({{ site.baseurl }}/user_guide/api/proxy) to learn how to use DistributedLog to build your reliable real-time services.