/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.admin;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ReadUtils;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.distributedlog.tools.DistributedLogTool;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * Admin Tool for DistributedLog.
 */
public class DistributedLogAdmin extends DistributedLogTool {

    static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);

    /**
     * Fix inprogress segment with lower ledger sequence number.
     *
     * @param namespace
     *          dl namespace
     * @param metadataUpdater
     *          metadata updater.
     * @param streamName
     *          stream name.
     * @param verbose
     *          print verbose messages.
     * @param interactive
     *          is confirmation needed before executing actual action.
     * @throws IOException
     */
    public static void fixInprogressSegmentWithLowerSequenceNumber(final Namespace namespace,
                                                                   final MetadataUpdater metadataUpdater,
                                                                   final String streamName,
                                                                   final boolean verbose,
                                                                   final boolean interactive) throws Exception {
        DistributedLogManager dlm = namespace.openLog(streamName);
        try {
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            if (verbose) {
                System.out.println("LogSegments for " + streamName + " : ");
                for (LogSegmentMetadata segment : segments) {
                    System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment);
                }
            }
            LOG.info("Get log segments for {} : {}", streamName, segments);
            // validate log segments
            long maxCompletedLogSegmentSequenceNumber = -1L;
            LogSegmentMetadata inprogressSegment = null;
            for (LogSegmentMetadata segment : segments) {
                if (!segment.isInProgress()) {
                    maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber,
                            segment.getLogSegmentSequenceNumber());
                } else {
                    // we already found an inprogress segment
                    if (null != inprogressSegment) {
                        throw new DLIllegalStateException("Multiple inprogress segments found for stream "
                                + streamName + " : " + segments);
                    }
                    inprogressSegment = segment;
                }
            }
            if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber()
                    > maxCompletedLogSegmentSequenceNumber) {
                // nothing to fix
                return;
            }
            final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1;
            if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
                return;
            }
            final LogSegmentMetadata newSegment =
                    FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment,
                            newLogSegmentSequenceNumber));
            LOG.info("Fixed {} : {} -> {} ", streamName, inprogressSegment, newSegment);
            if (verbose) {
                System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName()
                                   + " -> " + newSegment.getZNodeName());
                System.out.println("\t old: " + inprogressSegment);
                System.out.println("\t new: " + newSegment);
                System.out.println();
            }
        } finally {
            dlm.close();
        }
    }

    private static class LogSegmentCandidate {
        final LogSegmentMetadata metadata;
        final LogRecordWithDLSN lastRecord;

        LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) {
            this.metadata = metadata;
            this.lastRecord = lastRecord;
        }

        @Override
        public String toString() {
            return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]";
        }

    }

    private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR =
            new Comparator<LogSegmentCandidate>() {
                @Override
                public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) {
                    return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata);
                }
            };

    private static class StreamCandidate {

        final String streamName;
        final SortedSet<LogSegmentCandidate> segmentCandidates =
                new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR);

        StreamCandidate(String streamName) {
            this.streamName = streamName;
        }

        synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) {
            segmentCandidates.add(segmentCandidate);
        }

        @Override
        public String toString() {
            return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]";
        }
    }

    public static void checkAndRepairDLNamespace(final URI uri,
                                                 final Namespace namespace,
                                                 final MetadataUpdater metadataUpdater,
                                                 final OrderedScheduler scheduler,
                                                 final boolean verbose,
                                                 final boolean interactive) throws Exception {
        checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
    }

    public static void checkAndRepairDLNamespace(final URI uri,
                                                 final Namespace namespace,
                                                 final MetadataUpdater metadataUpdater,
                                                 final OrderedScheduler scheduler,
                                                 final boolean verbose,
                                                 final boolean interactive,
                                                 final int concurrency) throws Exception {
        checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
        // 0. getting streams under a given uri.
        Iterator<String> streamsIter = namespace.getLogs();
        List<String> streams = Lists.newArrayList();
        while (streamsIter.hasNext()) {
            streams.add(streamsIter.next());
        }
        if (verbose) {
            System.out.println("- 0. checking streams under " + uri);
        }
        if (streams.size() == 0) {
            System.out.println("+ 0. nothing to check. quit.");
            return;
        }
        Map<String, StreamCandidate> streamCandidates =
                checkStreams(namespace, streams, scheduler, concurrency);
        if (verbose) {
            System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
        }
        if (interactive && !IOUtils.confirmPrompt("Do you want to fix all "
                + streamCandidates.size() + " corrupted streams (Y/N) : ")) {
            return;
        }
        if (verbose) {
            System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams.");
        }
        for (StreamCandidate candidate : streamCandidates.values()) {
            if (!repairStream(metadataUpdater, candidate, verbose, interactive)) {
                if (verbose) {
                    System.out.println("* 1. aborted repairing corrupted streams.");
                }
                return;
            }
        }
        if (verbose) {
            System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams.");
        }
    }

    private static Map<String, StreamCandidate> checkStreams(
            final Namespace namespace,
            final Collection<String> streams,
            final OrderedScheduler scheduler,
            final int concurrency) throws IOException {
        final LinkedBlockingQueue<String> streamQueue =
                new LinkedBlockingQueue<String>();
        streamQueue.addAll(streams);
        final Map<String, StreamCandidate> candidateMap =
                new ConcurrentSkipListMap<String, StreamCandidate>();
        final AtomicInteger numPendingStreams = new AtomicInteger(streams.size());
        final CountDownLatch doneLatch = new CountDownLatch(1);
        Runnable checkRunnable = new Runnable() {
            @Override
            public void run() {
                while (!streamQueue.isEmpty()) {
                    String stream;
                    try {
                        stream = streamQueue.take();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                    StreamCandidate candidate;
                    try {
                        LOG.info("Checking stream {}.", stream);
                        candidate = checkStream(namespace, stream, scheduler);
                        LOG.info("Checked stream {} - {}.", stream, candidate);
                    } catch (Throwable e) {
                        LOG.error("Error on checking stream {} : ", stream, e);
                        doneLatch.countDown();
                        break;
                    }
                    if (null != candidate) {
                        candidateMap.put(stream, candidate);
                    }
                    if (numPendingStreams.decrementAndGet() == 0) {
                        doneLatch.countDown();
                    }
                }
            }
        };
        Thread[] threads = new Thread[concurrency];
        for (int i = 0; i < concurrency; i++) {
            threads[i] = new Thread(checkRunnable, "check-thread-" + i);
            threads[i].start();
        }
        try {
            doneLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (numPendingStreams.get() != 0) {
            throw new IOException(numPendingStreams.get() + " streams left w/o checked");
        }
        for (int i = 0; i < concurrency; i++) {
            threads[i].interrupt();
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return candidateMap;
    }

    private static StreamCandidate checkStream(
            final Namespace namespace,
            final String streamName,
            final OrderedScheduler scheduler) throws IOException {
        DistributedLogManager dlm = namespace.openLog(streamName);
        try {
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            if (segments.isEmpty()) {
                return null;
            }
            List<CompletableFuture<LogSegmentCandidate>> futures =
                    new ArrayList<CompletableFuture<LogSegmentCandidate>>(segments.size());
            for (LogSegmentMetadata segment : segments) {
                futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
            }
            List<LogSegmentCandidate> segmentCandidates;
            try {
                segmentCandidates = FutureUtils.result(FutureUtils.collect(futures));
            } catch (Exception e) {
                throw new IOException("Failed on checking stream " + streamName, e);
            }
            StreamCandidate streamCandidate = new StreamCandidate(streamName);
            for (LogSegmentCandidate segmentCandidate: segmentCandidates) {
                if (null != segmentCandidate) {
                    streamCandidate.addLogSegmentCandidate(segmentCandidate);
                }
            }
            if (streamCandidate.segmentCandidates.isEmpty()) {
                return null;
            }
            return streamCandidate;
        } finally {
            dlm.close();
        }
    }

    private static CompletableFuture<LogSegmentCandidate> checkLogSegment(
            final Namespace namespace,
            final String streamName,
            final LogSegmentMetadata metadata,
            final OrderedScheduler scheduler) {
        if (metadata.isInProgress()) {
            return FutureUtils.value(null);
        }

        final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
                .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
        return ReadUtils.asyncReadLastRecord(
                streamName,
                metadata,
                true,
                false,
                true,
                4,
                16,
                new AtomicInteger(0),
                scheduler,
                entryStore
        ).thenApply(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
            @Override
            public LogSegmentCandidate apply(LogRecordWithDLSN record) {
                if (null != record
                        && (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0
                            || record.getTransactionId() > metadata.getLastTxId()
                            || !metadata.isRecordPositionWithinSegmentScope(record))) {
                    return new LogSegmentCandidate(metadata, record);
                } else {
                    return null;
                }
            }
        });
    }

    private static boolean repairStream(MetadataUpdater metadataUpdater,
                                        StreamCandidate streamCandidate,
                                        boolean verbose,
                                        boolean interactive) throws Exception {
        if (verbose) {
            System.out.println("Stream " + streamCandidate.streamName + " : ");
            for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
                System.out.println("  " + segmentCandidate.metadata.getLogSegmentSequenceNumber()
                        + " : metadata = " + segmentCandidate.metadata + ", last dlsn = "
                        + segmentCandidate.lastRecord.getDlsn());
            }
            System.out.println("-------------------------------------------");
        }
        if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream "
                + streamCandidate.streamName + " (Y/N) : ")) {
            return false;
        }
        for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
            LogSegmentMetadata newMetadata = FutureUtils.result(
                    metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord));
            if (verbose) {
                System.out.println(" Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : ");
                System.out.println("    old metadata : " + segmentCandidate.metadata);
                System.out.println("    new metadata : " + newMetadata);
            }
        }
        if (verbose) {
            System.out.println("-------------------------------------------");
        }
        return true;
    }

    //
    // Commands
    //

    /**
     * Unbind the bookkeeper environment for a given distributedlog uri.
     * TODO: move unbind operation to namespace driver
     */
    class UnbindCommand extends OptsCommand {

        Options options = new Options();

        UnbindCommand() {
            super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
            options.addOption("f", "force", false, "Force unbinding without prompt.");
        }

        @Override
        protected Options getOptions() {
            return options;
        }

        @Override
        protected String getUsage() {
            return "unbind [options] <distributedlog uri>";
        }

        @Override
        protected int runCmd(CommandLine cmdline) throws Exception {
            String[] args = cmdline.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                printUsage();
                return -1;
            }
            boolean force = cmdline.hasOption("f");
            URI uri = URI.create(args[0]);
            // resolving the uri to see if there is another bindings in this uri.
            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
                    .sessionTimeoutMs(10000).build();
            BKDLConfig bkdlConfig;
            try {
                bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
            } catch (IOException ie) {
                bkdlConfig = null;
            }
            if (null == bkdlConfig) {
                System.out.println("No bookkeeper is bound to " + uri);
                return 0;
            } else {
                System.out.println("There is bookkeeper bound to " + uri + " : ");
                System.out.println("");
                System.out.println(bkdlConfig.toString());
                System.out.println("");
                if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) {
                    return 0;
                }
            }
            DLMetadata.unbind(uri);
            System.out.println("Unbound on " + uri + ".");
            return 0;
        }
    }

    /**
     * Bind Command to bind bookkeeper environment for a given distributed uri.
     * TODO: move bind to namespace driver
     */
    class BindCommand extends OptsCommand {

        Options options = new Options();

        BindCommand() {
            super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
            options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
            options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
            options.addOption("bkzr", "bkZkServersForReader", true,
                    "ZooKeeper servers used for bookkeeper for readers.");
            options.addOption("dlzw", "dlZkServersForWriter", true,
                    "ZooKeeper servers used for distributedlog for writers.");
            options.addOption("dlzr", "dlZkServersForReader", true,
                    "ZooKeeper servers used for distributedlog for readers.");
            options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
            options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
            options.addOption("seqno", "firstLogSegmentSeqNo", true,
                    "The first log segment sequence number to use after upgrade");
            options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
            options.addOption("f", "force", false, "Force binding without prompt.");
            options.addOption("c", "creation", false, "Whether is it a creation binding.");
            options.addOption("q", "query", false, "Query the bookkeeper bindings");
        }

        @Override
        protected Options getOptions() {
            return options;
        }

        @Override
        protected String getUsage() {
            return "bind [options] <distributedlog uri>";
        }

        @Override
        protected int runCmd(CommandLine cmdline) throws Exception {
            boolean isQuery = cmdline.hasOption("q");
            if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
                System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
                printUsage();
                return -1;
            }
            String[] args = cmdline.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                printUsage();
                return -1;
            }
            boolean force = cmdline.hasOption("f");
            boolean creation = cmdline.hasOption("c");
            String bkLedgersPath = cmdline.getOptionValue("l");
            String bkZkServersForWriter = cmdline.getOptionValue("s");
            boolean sanityCheckTxnID =
                    !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
            boolean encodeRegionID =
                    cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));

            String bkZkServersForReader;
            if (cmdline.hasOption("bkzr")) {
                bkZkServersForReader = cmdline.getOptionValue("bkzr");
            } else {
                bkZkServersForReader = bkZkServersForWriter;
            }

            URI uri = URI.create(args[0]);

            String dlZkServersForWriter;
            String dlZkServersForReader;
            if (cmdline.hasOption("dlzw")) {
                dlZkServersForWriter = cmdline.getOptionValue("dlzw");
            } else {
                dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
            }
            if (cmdline.hasOption("dlzr")) {
                dlZkServersForReader = cmdline.getOptionValue("dlzr");
            } else {
                dlZkServersForReader = dlZkServersForWriter;
            }

            // resolving the uri to see if there is another bindings in this uri.
            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
                    .sessionTimeoutMs(10000).build();
            try {
                BKDLConfig newBKDLConfig =
                        new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
                                       bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
                                .setSanityCheckTxnID(sanityCheckTxnID)
                                .setEncodeRegionID(encodeRegionID);

                if (cmdline.hasOption("seqno")) {
                    newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(
                            Long.parseLong(cmdline.getOptionValue("seqno")));
                }

                if (cmdline.hasOption("fns")) {
                    newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
                }

                BKDLConfig bkdlConfig;
                try {
                    bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
                } catch (IOException ie) {
                    bkdlConfig = null;
                }
                if (null == bkdlConfig) {
                    System.out.println("No bookkeeper is bound to " + uri);
                } else {
                    System.out.println("There is bookkeeper bound to " + uri + " : ");
                    System.out.println("");
                    System.out.println(bkdlConfig.toString());
                    System.out.println("");
                    if (!isQuery) {
                        if (newBKDLConfig.equals(bkdlConfig)) {
                            System.out.println("No bookkeeper binding needs to be updated. Quit.");
                            return 0;
                        } else if (!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
                            System.out.println("You can't turn a federated namespace back to non-federated.");
                            return 0;
                        } else {
                            if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
                                        + " with new bookkeeper instance :\n" + newBKDLConfig)) {
                                return 0;
                            }
                        }
                    }
                }
                if (isQuery) {
                    System.out.println("Done.");
                    return 0;
                }
                DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
                if (creation) {
                    try {
                        dlMetadata.create(uri);
                        System.out.println("Created binding on " + uri + ".");
                    } catch (IOException ie) {
                        System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
                    }
                } else {
                    try {
                        dlMetadata.update(uri);
                        System.out.println("Updated binding on " + uri + " : ");
                        System.out.println("");
                        System.out.println(newBKDLConfig.toString());
                        System.out.println("");
                    } catch (IOException ie) {
                        System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
                    }
                }
                if (newBKDLConfig.isFederatedNamespace()) {
                    try {
                        FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
                    } catch (KeeperException.NodeExistsException nee) {
                        // ignore node exists exception
                    }
                }
                return 0;
            } finally {
                zkc.close();
            }
        }
    }

    static class RepairSeqNoCommand extends PerDLCommand {

        boolean dryrun = false;
        boolean verbose = false;
        final List<String> streams = new ArrayList<String>();

        RepairSeqNoCommand() {
            super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
            options.addOption("d", "dryrun", false, "Dry run without repairing");
            options.addOption("l", "list", true, "List of streams to repair, separated by comma");
            options.addOption("v", "verbose", false, "Print verbose messages");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            dryrun = cmdline.hasOption("d");
            verbose = cmdline.hasOption("v");
            force = !dryrun && cmdline.hasOption("f");
            if (!cmdline.hasOption("l")) {
                throw new ParseException("No streams provided to repair");
            }
            String streamsList = cmdline.getOptionValue("l");
            Collections.addAll(streams, streamsList.split(","));
        }

        @Override
        protected int runCmd() throws Exception {
            MetadataUpdater metadataUpdater = dryrun
                    ? new DryrunLogSegmentMetadataStoreUpdater(getConf(),
                            getLogSegmentMetadataStore()) :
                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
                            getLogSegmentMetadataStore());
            System.out.println("List of streams : ");
            System.out.println(streams);
            if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
                return -1;
            }
            for (String stream : streams) {
                fixInprogressSegmentWithLowerSequenceNumber(getNamespace(),
                        metadataUpdater, stream, verbose, !getForce());
            }
            return 0;
        }

        @Override
        protected String getUsage() {
            return "repairseqno [options]";
        }
    }

    static class DLCKCommand extends PerDLCommand {

        boolean dryrun = false;
        boolean verbose = false;
        int concurrency = 1;

        DLCKCommand() {
            super("dlck", "Check and repair a distributedlog namespace");
            options.addOption("d", "dryrun", false, "Dry run without repairing");
            options.addOption("v", "verbose", false, "Print verbose messages");
            options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            dryrun = cmdline.hasOption("d");
            verbose = cmdline.hasOption("v");
            if (cmdline.hasOption("cy")) {
                try {
                    concurrency = Integer.parseInt(cmdline.getOptionValue("cy"));
                } catch (NumberFormatException nfe) {
                    throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy"));
                }
            }
        }

        @Override
        protected int runCmd() throws Exception {
            MetadataUpdater metadataUpdater = dryrun
                    ? new DryrunLogSegmentMetadataStoreUpdater(getConf(),
                            getLogSegmentMetadataStore()) :
                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
                            getLogSegmentMetadataStore());
            OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
                    .name("dlck-scheduler")
                    .numThreads(Runtime.getRuntime().availableProcessors())
                    .build();
            ExecutorService executorService = Executors.newCachedThreadPool();
            try {
                checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
                                          verbose, !getForce(), concurrency);
            } finally {
                SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
            }
            return 0;
        }

        @Override
        protected String getUsage() {
            return "dlck [options]";
        }
    }

    static class DeleteStreamACLCommand extends PerDLCommand {

        String stream = null;

        DeleteStreamACLCommand() {
            super("delete_stream_acl", "Delete ACL for a given stream");
            options.addOption("s", "stream", true, "Stream to set ACL");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            if (!cmdline.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            stream = cmdline.getOptionValue("s");
        }

        @Override
        protected int runCmd() throws Exception {
            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
            if (null == bkdlConfig.getACLRootPath()) {
                // acl isn't enabled for this namespace.
                System.err.println("ACL isn't enabled for namespace " + getUri());
                return -1;
            }
            String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream;
            ZKAccessControl.delete(getZooKeeperClient(), zkPath);
            return 0;
        }

        @Override
        protected String getUsage() {
            return null;
        }
    }

    static class SetStreamACLCommand extends SetACLCommand {

        String stream = null;

        SetStreamACLCommand() {
            super("set_stream_acl", "Set Default ACL for a given stream");
            options.addOption("s", "stream", true, "Stream to set ACL");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            if (!cmdline.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            stream = cmdline.getOptionValue("s");
        }

        @Override
        protected String getZKPath(String zkRootPath) {
            return zkRootPath + "/" + stream;
        }

        @Override
        protected String getUsage() {
            return "set_stream_acl [options]";
        }
    }

    static class SetDefaultACLCommand extends SetACLCommand {

        SetDefaultACLCommand() {
            super("set_default_acl", "Set Default ACL for a namespace");
        }

        @Override
        protected String getZKPath(String zkRootPath) {
            return zkRootPath;
        }

        @Override
        protected String getUsage() {
            return "set_default_acl [options]";
        }
    }

    abstract static  class SetACLCommand extends PerDLCommand {

        boolean denyWrite = false;
        boolean denyTruncate = false;
        boolean denyDelete = false;
        boolean denyAcquire = false;
        boolean denyRelease = false;

        protected SetACLCommand(String name, String description) {
            super(name, description);
            options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
            options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
            options.addOption("dd", "deny-delete", false, "Deny delete requests");
            options.addOption("da", "deny-acquire", false, "Deny acquire requests");
            options.addOption("dr", "deny-release", false, "Deny release requests");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            denyWrite = cmdline.hasOption("dw");
            denyTruncate = cmdline.hasOption("dt");
            denyDelete = cmdline.hasOption("dd");
            denyAcquire = cmdline.hasOption("da");
            denyRelease = cmdline.hasOption("dr");
        }

        protected abstract String getZKPath(String zkRootPath);

        protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
            ZKAccessControl accessControl;
            try {
                accessControl = FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null));
            } catch (KeeperException.NoNodeException nne) {
                accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
            }
            return accessControl;
        }

        protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception {
            String zkPath = accessControl.getZKPath();
            if (null == zkc.get().exists(zkPath, false)) {
                accessControl.create(zkc);
            } else {
                accessControl.update(zkc);
            }
        }

        @Override
        protected int runCmd() throws Exception {
            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
            if (null == bkdlConfig.getACLRootPath()) {
                // acl isn't enabled for this namespace.
                System.err.println("ACL isn't enabled for namespace " + getUri());
                return -1;
            }
            String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath());
            ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath);
            AccessControlEntry acl = accessControl.getAccessControlEntry();
            acl.setDenyWrite(denyWrite);
            acl.setDenyTruncate(denyTruncate);
            acl.setDenyDelete(denyDelete);
            acl.setDenyAcquire(denyAcquire);
            acl.setDenyRelease(denyRelease);
            setZKAccessControl(getZooKeeperClient(), accessControl);
            return 0;
        }

    }

    public DistributedLogAdmin() {
        super();
        commands.clear();
        addCommand(new HelpCommand());
        addCommand(new BindCommand());
        addCommand(new UnbindCommand());
        addCommand(new RepairSeqNoCommand());
        addCommand(new DLCKCommand());
        addCommand(new SetDefaultACLCommand());
        addCommand(new SetStreamACLCommand());
        addCommand(new DeleteStreamACLCommand());
    }

    @Override
    protected String getName() {
        return "dlog_admin";
    }
}
