/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.impl.metadata;

import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
import static org.apache.distributedlog.DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
import static org.apache.distributedlog.metadata.LogMetadata.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClient.ZooKeeperConnectionException;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockCancelledException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.lock.SessionLockFactory;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.LimitedPermitManager;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.Op.Create;
import org.apache.zookeeper.Op.Delete;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * zookeeper based {@link LogStreamMetadataStore}.
 */
public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {

    private static final  Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);

    private final String clientId;
    private final DistributedLogConfiguration conf;
    private final ZooKeeperClient zooKeeperClient;
    private final OrderedScheduler scheduler;
    private final StatsLogger statsLogger;
    private final LogSegmentMetadataStore logSegmentStore;
    private final LimitedPermitManager permitManager;
    // lock
    private SessionLockFactory lockFactory;
    private OrderedScheduler lockStateExecutor;

    public ZKLogStreamMetadataStore(String clientId,
                                    DistributedLogConfiguration conf,
                                    ZooKeeperClient zkc,
                                    OrderedScheduler scheduler,
                                    StatsLogger statsLogger) {
        this.clientId = clientId;
        this.conf = conf;
        this.zooKeeperClient = zkc;
        this.scheduler = scheduler;
        this.statsLogger = statsLogger;
        // create the log segment metadata store and the permit manager (used for log segment rolling)
        this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
        this.permitManager = new LimitedPermitManager(
                conf.getLogSegmentRollingConcurrency(),
                1,
                TimeUnit.MINUTES,
                scheduler);
        this.zooKeeperClient.register(permitManager);
    }

    private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
        if (createIfNull && null == lockStateExecutor) {
            lockStateExecutor = OrderedScheduler.newBuilder()
                    .name("DLM-LockState")
                    .corePoolSize(conf.getNumLockStateThreads())
                    .build();
        }
        return lockStateExecutor;
    }

    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
        if (createIfNull && null == lockFactory) {
            lockFactory = new ZKSessionLockFactory(
                    zooKeeperClient,
                    clientId,
                    getLockStateExecutor(createIfNull),
                    conf.getZKNumRetries(),
                    conf.getLockTimeoutMilliSeconds(),
                    conf.getZKRetryBackoffStartMillis(),
                    statsLogger);
        }
        return lockFactory;
    }

    @Override
    public void close() throws IOException {
        this.zooKeeperClient.unregister(permitManager);
        this.permitManager.close();
        this.logSegmentStore.close();
        SchedulerUtils.shutdownScheduler(
                getLockStateExecutor(false),
                conf.getSchedulerShutdownTimeoutMs(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public LogSegmentMetadataStore getLogSegmentMetadataStore() {
        return logSegmentStore;
    }

    @Override
    public PermitManager getPermitManager() {
        return this.permitManager;
    }

    @Override
    public Transaction<Object> newTransaction() {
        return new ZKTransaction(zooKeeperClient);
    }

    @Override
    public CompletableFuture<Void> logExists(URI uri, final String logName) {
        final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                uri, logName, conf.getUnpartitionedStreamName());
        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
        try {
            final ZooKeeper zk = zooKeeperClient.get();
            zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int syncRc, String path, Object syncCtx) {
                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
                        promise.completeExceptionally(new LogNotFoundException(
                                String.format("Log %s does not exist or has been deleted", logName)));
                        return;
                    } else if (KeeperException.Code.OK.intValue() != syncRc){
                        promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName,
                                KeeperException.create(KeeperException.Code.get(syncRc))));
                        return;
                    }
                    zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
                        @Override
                        public void processResult(int rc, String path, Object ctx, Stat stat) {
                            if (KeeperException.Code.OK.intValue() == rc) {
                                promise.complete(null);
                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
                                promise.completeExceptionally(new LogNotFoundException(
                                        String.format("Log %s does not exist or has been deleted", logName)));
                            } else {
                                promise.completeExceptionally(
                                        new ZKException("Error on checking log existence for "
                                                + logName, KeeperException.create(KeeperException.Code.get(rc))));
                            }
                        }
                    }, null);
                }
            }, null);

        } catch (InterruptedException ie) {
            LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
            promise.completeExceptionally(new DLInterruptedException("Interrupted while checking "
                    + logSegmentsPath, ie));
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.completeExceptionally(e);
        }
        return promise;
    }

    //
    // Create Write Lock
    //

    @Override
    public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
        return new ZKDistributedLock(
                getLockStateExecutor(true),
                getLockFactory(true),
                metadata.getLockPath(),
                conf.getLockTimeoutMilliSeconds(),
                statsLogger);
    }

    //
    // Create Read Lock
    //

    private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
                                                 final String readLockPath) {
        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
        promise.whenComplete((value, cause) -> {
            if (cause instanceof CancellationException) {
                FutureUtils.completeExceptionally(promise, new LockCancelledException(readLockPath,
                        "Could not ensure read lock path", cause));
            }
        });
        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
                new org.apache.zookeeper.AsyncCallback.StringCallback() {
                    @Override
                    public void processResult(final int rc, final String path, Object ctx, String name) {
                        if (KeeperException.Code.NONODE.intValue() == rc) {
                            FutureUtils.completeExceptionally(promise, new LogNotFoundException(
                                    String.format("Log %s does not exist or has been deleted",
                                            logMetadata.getFullyQualifiedName())));
                        } else if (KeeperException.Code.OK.intValue() == rc) {
                            FutureUtils.complete(promise, null);
                            LOG.trace("Created path {}.", path);
                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                            FutureUtils.complete(promise, null);
                            LOG.trace("Path {} is already existed.", path);
                        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
                            FutureUtils.completeExceptionally(promise,
                                    new ZooKeeperClient.ZooKeeperConnectionException(path));
                        } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
                            FutureUtils.completeExceptionally(promise, new DLInterruptedException(path));
                        } else {
                            FutureUtils.completeExceptionally(promise,
                                    KeeperException.create(KeeperException.Code.get(rc)));
                        }
                    }
                }, null);
        return promise;
    }

    @Override
    public CompletableFuture<DistributedLock> createReadLock(final LogMetadataForReader metadata,
                                                  Optional<String> readerId) {
        final String readLockPath = metadata.getReadLockPath(readerId);
        return ensureReadLockPathExist(metadata, readLockPath)
            .thenApplyAsync((value) -> {
                DistributedLock lock = new ZKDistributedLock(
                    getLockStateExecutor(true),
                    getLockFactory(true),
                    readLockPath,
                    conf.getLockTimeoutMilliSeconds(),
                    statsLogger.scope("read_lock"));
                return lock;
            }, scheduler.chooseExecutor(readLockPath));
    }

    //
    // Create Log
    //

    static class MetadataIndex {
        static final int LOG_ROOT_PARENT = 0;
        static final int LOG_ROOT = 1;
        static final int MAX_TXID = 2;
        static final int VERSION = 3;
        static final int LOCK = 4;
        static final int READ_LOCK = 5;
        static final int LOGSEGMENTS = 6;
        static final int ALLOCATION = 7;
    }

    static int bytesToInt(byte[] b) {
        assert b.length >= 4;
        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
    }

    static byte[] intToBytes(int i) {
        return new byte[]{
            (byte) (i >> 24),
            (byte) (i >> 16),
            (byte) (i >> 8),
            (byte) (i)};
    }

    static CompletableFuture<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
                                                                 String logRootPath,
                                                                 boolean ownAllocator) {
        // Note re. persistent lock state initialization: the read lock persistent state (path) is
        // initialized here but only used in the read handler. The reason is its more convenient and
        // less error prone to manage all stream structure in one place.
        final String logRootParentPath = Utils.getParent(logRootPath);
        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
        final String lockPath = logRootPath + LOCK_PATH;
        final String readLockPath = logRootPath + READ_LOCK_PATH;
        final String versionPath = logRootPath + VERSION_PATH;
        final String allocationPath = logRootPath + ALLOCATION_PATH;

        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
        List<CompletableFuture<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
        if (ownAllocator) {
            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
        }

        return FutureUtils.collect(checkFutures);
    }

    static boolean pathExists(Versioned<byte[]> metadata) {
        return null != metadata.getValue() && null != metadata.getVersion();
    }

    static void ensureMetadataExist(Versioned<byte[]> metadata) {
        checkNotNull(metadata.getValue());
        checkNotNull(metadata.getVersion());
    }

    static void createMissingMetadata(final ZooKeeper zk,
                                      final String basePath,
                                      final String logRootPath,
                                      final List<Versioned<byte[]>> metadatas,
                                      final List<ACL> acl,
                                      final boolean ownAllocator,
                                      final boolean createIfNotExists,
                                      final CompletableFuture<List<Versioned<byte[]>>> promise) {
        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
        CreateMode createMode = CreateMode.PERSISTENT;

        // log root parent path
        String logRootParentPath = Utils.getParent(logRootPath);
        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
        }

        // log root path
        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
        }

        // max id
        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
            pathsToCreate.add(null);
        } else {
            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
            pathsToCreate.add(zeroTxnIdData);
            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
        }
        // version
        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
            pathsToCreate.add(null);
        } else {
            byte[] versionData = intToBytes(LAYOUT_VERSION);
            pathsToCreate.add(versionData);
            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
        }
        // lock path
        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
        }
        // read lock path
        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
        }
        // log segments path
        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
            pathsToCreate.add(null);
        } else {
            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
                DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
            pathsToCreate.add(logSegmentsData);
            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
        }
        // allocation path
        if (ownAllocator) {
            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
                pathsToCreate.add(null);
            } else {
                pathsToCreate.add(EMPTY_BYTES);
                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
                    EMPTY_BYTES, acl, createMode));
            }
        }
        if (zkOps.isEmpty()) {
            // nothing missed
            promise.complete(metadatas);
            return;
        }
        if (!createIfNotExists) {
            promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found"));
            return;
        }

        getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
            .whenComplete(new FutureEventListener<List<String>>() {
                @Override
                public void onSuccess(List<String> paths) {
                    for (String path : paths) {
                        pathsToCreate.add(EMPTY_BYTES);
                        zkOps.add(
                            0, Op.create(path, EMPTY_BYTES, acl, createMode));
                    }
                    executeCreateMissingPathTxn(
                        zk,
                        zkOps,
                        pathsToCreate,
                        metadatas,
                        logRootPath,
                        promise
                    );
                }

                @Override
                public void onFailure(Throwable cause) {
                    promise.completeExceptionally(cause);
                    return;
                }
            });

    }

    private static void executeCreateMissingPathTxn(ZooKeeper zk,
                                                    List<Op> zkOps,
                                                    List<byte[]> pathsToCreate,
                                                    List<Versioned<byte[]>> metadatas,
                                                    String logRootPath,
                                                    CompletableFuture<List<Versioned<byte[]>>> promise) {

        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
                if (KeeperException.Code.OK.intValue() == rc) {
                    List<Versioned<byte[]>> finalMetadatas =
                            Lists.newArrayListWithExpectedSize(metadatas.size());
                    for (int i = 0; i < pathsToCreate.size(); i++) {
                        byte[] dataCreated = pathsToCreate.get(i);
                        if (null == dataCreated) {
                            finalMetadatas.add(metadatas.get(i));
                        } else {
                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new LongVersion(0)));
                        }
                    }
                    promise.complete(finalMetadatas);
                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                    promise.completeExceptionally(new LogExistsException("Someone just created log "
                            + logRootPath));
                } else {
                    if (LOG.isDebugEnabled()) {
                        StringBuilder builder = new StringBuilder();
                        for (OpResult result : resultList) {
                            if (result instanceof OpResult.ErrorResult) {
                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
                                builder.append(errorResult.getErr()).append(",");
                            } else {
                                builder.append(0).append(",");
                            }
                        }
                        String resultCodeList = builder.substring(0, builder.length() - 1);
                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
                    }

                    promise.completeExceptionally(new ZKException("Failed to create log " + logRootPath,
                            KeeperException.Code.get(rc)));
                }
            }
        }, null);
    }

    static LogMetadataForWriter processLogMetadatas(URI uri,
                                                    String logName,
                                                    String logIdentifier,
                                                    List<Versioned<byte[]>> metadatas,
                                                    boolean ownAllocator)
            throws UnexpectedException {
        try {
            // max id
            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
            ensureMetadataExist(maxTxnIdData);
            // version
            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
            ensureMetadataExist(maxTxnIdData);
            checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
            // lock path
            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
            // read lock path
            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
            // max lssn
            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
            ensureMetadataExist(maxLSSNData);
            try {
                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
            } catch (NumberFormatException nfe) {
                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
            }
            // allocation path
            Versioned<byte[]>  allocationData;
            if (ownAllocator) {
                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
                ensureMetadataExist(allocationData);
            } else {
                allocationData = new Versioned<byte[]>(null, null);
            }
            return new LogMetadataForWriter(uri, logName, logIdentifier,
                    maxLSSNData, maxTxnIdData, allocationData);
        } catch (IllegalArgumentException iae) {
            throw new UnexpectedException("Invalid log " + logName, iae);
        } catch (NullPointerException npe) {
            throw new UnexpectedException("Invalid log " + logName, npe);
        }
    }

    static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
                                               final String logName,
                                               final String logIdentifier,
                                               final ZooKeeperClient zooKeeperClient,
                                               final boolean ownAllocator,
                                               final boolean createIfNotExists) {
        final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
        try {
            PathUtils.validatePath(logRootPath);
        } catch (IllegalArgumentException e) {
            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
            return FutureUtils.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
        }

        try {
            final ZooKeeper zk = zooKeeperClient.get();
            return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
                    .thenCompose(metadatas -> {
                        CompletableFuture<List<Versioned<byte[]>>> promise =
                                new CompletableFuture<List<Versioned<byte[]>>>();
                        createMissingMetadata(
                            zk,
                            uri.getPath(),
                            logRootPath,
                            metadatas,
                            zooKeeperClient.getDefaultACL(),
                            ownAllocator,
                            createIfNotExists,
                            promise);
                        return promise;
                    }).thenCompose(metadatas -> {
                        try {
                            return FutureUtils.value(
                                processLogMetadatas(
                                    uri,
                                    logName,
                                    logIdentifier,
                                    metadatas,
                                    ownAllocator));
                        } catch (UnexpectedException e) {
                            return FutureUtils.exception(e);
                        }
                    });
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(
                    new ZKException("Encountered zookeeper connection issue on creating log "
                            + logName, KeeperException.Code.CONNECTIONLOSS));
        } catch (InterruptedException e) {
            return FutureUtils.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
        }
    }

    @Override
    public CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
                                               final String logName,
                                               final boolean ownAllocator,
                                               final boolean createIfNotExists) {
        return getLog(
                uri,
                logName,
                conf.getUnpartitionedStreamName(),
                zooKeeperClient,
                ownAllocator,
                createIfNotExists);
    }

    //
    // Delete Log
    //

    @Override
    public CompletableFuture<Void> deleteLog(URI uri, final String logName) {
        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
        try {
            String streamPath = LogMetadata.getLogStreamPath(uri, logName);
            ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx) {
                    if (KeeperException.Code.OK.intValue() != rc) {
                        FutureUtils.completeExceptionally(promise,
                                new ZKException("Encountered zookeeper issue on deleting log stream "
                                        + logName, KeeperException.Code.get(rc)));
                        return;
                    }
                    FutureUtils.complete(promise, null);
                }
            }, null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            FutureUtils.completeExceptionally(promise,
                    new ZKException("Encountered zookeeper issue on deleting log stream "
                            + logName, KeeperException.Code.CONNECTIONLOSS));
        } catch (InterruptedException e) {
            FutureUtils.completeExceptionally(promise,
                    new DLInterruptedException("Interrupted while deleting log stream " + logName));
        } catch (KeeperException e) {
            FutureUtils.completeExceptionally(promise,
                    new ZKException("Encountered zookeeper issue on deleting log stream "
                    + logName, e));
        }
        return promise;
    }

    //
    // Rename Log
    //

    @Override
    public CompletableFuture<Void> renameLog(URI uri, String oldStreamName, String newStreamName) {
        return getLog(
            uri,
            oldStreamName,
            true,
            false
        ).thenCompose(metadata -> renameLogMetadata(uri, metadata, newStreamName));
    }

    private CompletableFuture<Void> renameLogMetadata(URI uri,
                                                      LogMetadataForWriter oldMetadata,
                                                      String newStreamName) {


        final LinkedList<Op> createOps = Lists.newLinkedList();
        final LinkedList<Op> deleteOps = Lists.newLinkedList();

        List<ACL> acls = zooKeeperClient.getDefaultACL();

        // get the root path
        String oldRootPath = oldMetadata.getLogRootPath();
        String newRootPath = LogMetadata.getLogRootPath(
            uri, newStreamName, conf.getUnpartitionedStreamName());

        // 0. the log path
        deleteOps.addFirst(Op.delete(
            LogMetadata.getLogStreamPath(uri, oldMetadata.getLogName()), -1));

        // 1. the root path
        createOps.addLast(Op.create(
            newRootPath, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
        deleteOps.addFirst(Op.delete(
            oldRootPath, -1));

        // 2. max id
        Versioned<byte[]> maxTxIdData = oldMetadata.getMaxTxIdData();
        deleteOldPathAndCreateNewPath(
            oldRootPath, MAX_TXID_PATH, maxTxIdData,
            newRootPath, DLUtils.serializeTransactionId(0L), acls,
            createOps, deleteOps
        );

        // 3. version
        createOps.addLast(Op.create(
            newRootPath + VERSION_PATH, intToBytes(LAYOUT_VERSION), acls, CreateMode.PERSISTENT));
        deleteOps.addFirst(Op.delete(
            oldRootPath + VERSION_PATH, -1));

        // 4. lock path (NOTE: if the stream is locked by a writer, then the delete will fail as you can not
        //    delete the lock path if children is not empty.
        createOps.addLast(Op.create(
            newRootPath + LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
        deleteOps.addFirst(Op.delete(
            oldRootPath + LOCK_PATH, -1));

        // 5. read lock path (NOTE: same reason as the write lock)
        createOps.addLast(Op.create(
            newRootPath + READ_LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
        deleteOps.addFirst(Op.delete(
            oldRootPath + READ_LOCK_PATH, -1));

        // 6. allocation path
        Versioned<byte[]> allocationData = oldMetadata.getAllocationData();
        deleteOldPathAndCreateNewPath(
            oldRootPath, ALLOCATION_PATH, allocationData,
            newRootPath, EMPTY_BYTES, acls,
            createOps, deleteOps);

        // 7. log segments
        Versioned<byte[]> maxLSSNData = oldMetadata.getMaxLSSNData();
        deleteOldPathAndCreateNewPath(
            oldRootPath, LOGSEGMENTS_PATH, maxLSSNData,
            newRootPath, DLUtils.serializeLogSegmentSequenceNumber(UNASSIGNED_LOGSEGMENT_SEQNO), acls,
            createOps, deleteOps);

        // 8. copy the log segments
        CompletableFuture<List<LogSegmentMetadata>> segmentsFuture;
        if (pathExists(maxLSSNData)) {
            segmentsFuture = getLogSegments(zooKeeperClient, oldRootPath + LOGSEGMENTS_PATH);
        } else {
            segmentsFuture = FutureUtils.value(Collections.emptyList());
        }
        return segmentsFuture
            // copy the segments
            .thenApply(segments -> {
                for (LogSegmentMetadata segment : segments) {
                    deleteOldSegmentAndCreateNewSegment(
                        segment,
                        newRootPath + LOGSEGMENTS_PATH,
                        acls,
                        createOps,
                        deleteOps);
                }
                return null;
            })
            // get the missing paths
            .thenCompose(ignored ->
                getMissingPaths(zooKeeperClient, uri, newStreamName)
            )
            // create the missing paths and execute the rename transaction
            .thenCompose(paths -> {
                for (String path : paths) {
                    createOps.addFirst(Op.create(
                        path, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
                }
                return executeRenameTxn(oldRootPath, newRootPath, createOps, deleteOps);
            });
    }

    @VisibleForTesting
    static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (ZooKeeperConnectionException | InterruptedException e) {
            return FutureUtils.exception(e);
        }
        String basePath = uri.getPath();
        String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
        return getMissingPaths(zk, basePath, logStreamPath);
    }

    @VisibleForTesting
    static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
        LinkedList<String> missingPaths = Lists.newLinkedList();
        CompletableFuture<List<String>> future = FutureUtils.createFuture();
        existPath(zk, logStreamPath, basePath, missingPaths, future);
        return future;
    }

    private static void existPath(ZooKeeper zk,
                                  String path,
                                  String basePath,
                                  LinkedList<String> missingPaths,
                                  CompletableFuture<List<String>> future) {
        if (basePath.equals(path)) {
            future.complete(missingPaths);
            return;
        }
        zk.exists(path, false, (rc, path1, ctx, stat) -> {
            if (Code.OK.intValue() != rc && Code.NONODE.intValue() != rc) {
                future.completeExceptionally(new ZKException("Failed to check existence of path " + path1,
                    Code.get(rc)));
                return;
            }

            if (Code.OK.intValue() == rc) {
                future.complete(missingPaths);
                return;
            }

            missingPaths.addLast(path);
            String parentPath = Utils.getParent(path);
            existPath(zk, parentPath, basePath, missingPaths, future);
        }, null);
    }

    private CompletableFuture<Void> executeRenameTxn(String oldLogPath,
                                                     String newLogPath,
                                                     LinkedList<Op> createOps,
                                                     LinkedList<Op> deleteOps) {
        CompletableFuture<Void> future = FutureUtils.createFuture();
        List<Op> zkOps = Lists.newArrayListWithExpectedSize(createOps.size() + deleteOps.size());
        zkOps.addAll(createOps);
        zkOps.addAll(deleteOps);

        if (LOG.isDebugEnabled()) {
            for (Op op : zkOps) {
                if (op instanceof Create) {
                    Create create = (Create) op;
                    LOG.debug("op : create {}", create.getPath());
                } else if (op instanceof Delete) {
                    Delete delete = (Delete) op;
                    LOG.debug("op : delete {}, record = {}", delete.getPath(), op.toRequestRecord());
                } else {
                    LOG.debug("op : {}", op);
                }
            }
        }

        try {
            zooKeeperClient.get().multi(zkOps, (rc, path, ctx, opResults) -> {
                if (Code.OK.intValue() == rc) {
                    future.complete(null);
                } else if (Code.NODEEXISTS.intValue() == rc) {
                    future.completeExceptionally(new LogExistsException("Someone just created new log " + newLogPath));
                } else if (Code.NOTEMPTY.intValue() == rc) {
                    future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
                        "Someone is holding a lock on log " + oldLogPath));
                } else {
                    future.completeExceptionally(new ZKException("Failed to rename log "
                        + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
                }
            }, null);
        } catch (ZooKeeperConnectionException e) {
            future.completeExceptionally(e);
        } catch (InterruptedException e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata oldMetadata,
                                                            String newSegmentsPath,
                                                            List<ACL> acls,
                                                            LinkedList<Op> createOps,
                                                            LinkedList<Op> deleteOps) {
        createOps.addLast(Op.create(
            newSegmentsPath + "/" + oldMetadata.getZNodeName(),
            oldMetadata.getFinalisedData().getBytes(UTF_8),
            acls,
            CreateMode.PERSISTENT));
        deleteOps.addFirst(Op.delete(
            oldMetadata.getZkPath(),
            -1));
    }

    private static void deleteOldPathAndCreateNewPath(String oldRootPath,
                                                      String nodePath,
                                                      Versioned<byte[]> pathData,
                                                      String newRootPath,
                                                      byte[] initData,
                                                      List<ACL> acls,
                                                      LinkedList<Op> createOps,
                                                      LinkedList<Op> deleteOps) {
        if (pathExists(pathData)) {
            createOps.addLast(Op.create(
                newRootPath + nodePath, pathData.getValue(), acls, CreateMode.PERSISTENT));
            deleteOps.addFirst(Op.delete(
                oldRootPath + nodePath, (int) ((LongVersion) pathData.getVersion()).getLongVersion()));
        } else {
            createOps.addLast(Op.create(
                newRootPath + nodePath, initData, acls, CreateMode.PERSISTENT));
        }
    }

    @VisibleForTesting
    static CompletableFuture<List<LogSegmentMetadata>> getLogSegments(ZooKeeperClient zk,
                                                                      String logSegmentsPath) {
        CompletableFuture<List<LogSegmentMetadata>> future = FutureUtils.createFuture();
        try {
            zk.get().getChildren(logSegmentsPath, false, (rc, path, ctx, children, stat) -> {
                if (Code.OK.intValue() != rc) {
                    if (Code.NONODE.intValue() == rc) {
                        future.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
                    } else {
                        future.completeExceptionally(new ZKException("Failed to get log segments from " + path,
                            Code.get(rc)));
                    }
                    return;
                }

                // get all the segments
                List<CompletableFuture<LogSegmentMetadata>> futures =
                    Lists.newArrayListWithExpectedSize(children.size());
                for (String child : children) {
                    futures.add(LogSegmentMetadata.read(zk, logSegmentsPath + "/" + child));
                }
                FutureUtils.proxyTo(
                    FutureUtils.collect(futures),
                    future);
            }, null);
        } catch (ZooKeeperConnectionException e) {
            future.completeExceptionally(e);
        } catch (InterruptedException e) {
            future.completeExceptionally(e);
        }
        return future;
    }

}
