/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.util;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.common.functions.VoidFunctions;
import org.apache.distributedlog.io.AsyncAbortable;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

/**
 * Basic Utilities.
 */
@Slf4j
public class Utils {

    /**
     * Current time from some arbitrary time base in the past, counting in
     * nanoseconds, and not affected by settimeofday or similar system clock
     * changes. This is appropriate to use when computing how much longer to
     * wait for an interval to expire.
     *
     * @return current time in nanoseconds.
     */
    public static long nowInNanos() {
        return System.nanoTime();
    }

    /**
     * Current time from some fixed base time - so useful for cross machine
     * comparison
     *
     * @return current time in milliseconds.
     */
    public static long nowInMillis() {
        return System.currentTimeMillis();
    }

    /**
     * Milliseconds elapsed since the time specified, the input is nanoTime
     * the only conversion happens when computing the elapsed time
     *
     * @param startMsecTime the start of the interval that we are measuring
     * @return elapsed time in milliseconds.
     */
    public static long elapsedMSec(long startMsecTime) {
        return (System.currentTimeMillis() - startMsecTime);
    }

    public static boolean randomPercent(double percent) {
        return (Math.random() * 100.0) <= percent;
    }

    /**
     * Synchronously create zookeeper path recursively and optimistically.
     *
     * @see #zkAsyncCreateFullPathOptimistic(ZooKeeperClient, String, byte[], List, CreateMode)
     * @param zkc Zookeeper client
     * @param path Zookeeper full path
     * @param data Zookeeper data
     * @param acl Acl of the zk path
     * @param createMode Create mode of zk path
     * @throws ZooKeeperClient.ZooKeeperConnectionException
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static void zkCreateFullPathOptimistic(
        ZooKeeperClient zkc,
        String path,
        byte[] data,
        final List<ACL> acl,
        final CreateMode createMode) throws IOException, KeeperException {
        try {
            FutureUtils.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode));
        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
            throw zkce;
        } catch (KeeperException ke) {
            throw ke;
        } catch (InterruptedException ie) {
            throw new DLInterruptedException("Interrupted on create zookeeper path " + path, ie);
        } catch (RuntimeException rte) {
            throw rte;
        } catch (Exception exc) {
            throw new RuntimeException("Unexpected Exception", exc);
        }
    }

    /**
     * Asynchronously create zookeeper path recursively and optimistically.
     *
     * @param zkc Zookeeper client
     * @param pathToCreate  Zookeeper full path
     * @param parentPathShouldNotCreate The recursive creation should stop if this path doesn't exist
     * @param data Zookeeper data
     * @param acl Acl of the zk path
     * @param createMode Create mode of zk path
     * @param callback Callback
     * @param ctx Context object
     */
    public static void zkAsyncCreateFullPathOptimisticRecursive(
        final ZooKeeperClient zkc,
        final String pathToCreate,
        final Optional<String> parentPathShouldNotCreate,
        final byte[] data,
        final List<ACL> acl,
        final CreateMode createMode,
        final AsyncCallback.StringCallback callback,
        final Object ctx) {
        try {
            zkc.get().create(pathToCreate, data, acl, createMode, new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, String name) {

                    if (rc != KeeperException.Code.NONODE.intValue()) {
                        callback.processResult(rc, path, ctx, name);
                        return;
                    }

                    // Since we got a nonode, it means that my parents may not exist
                    // ephemeral nodes can't have children so Create mode is always
                    // persistent parents
                    int lastSlash = pathToCreate.lastIndexOf('/');
                    if (lastSlash <= 0) {
                        callback.processResult(rc, path, ctx, name);
                        return;
                    }
                    String parent = pathToCreate.substring(0, lastSlash);
                    if (parentPathShouldNotCreate.isPresent() && Objects.equal(parentPathShouldNotCreate.get(), parent)) {
                        // we should stop here
                        callback.processResult(rc, path, ctx, name);
                        return;
                    }
                    zkAsyncCreateFullPathOptimisticRecursive(zkc, parent, parentPathShouldNotCreate, new byte[0], acl,
                            CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
                                @Override
                                public void processResult(int rc, String path, Object ctx, String name) {
                                    if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NODEEXISTS.intValue()) {
                                        // succeeded in creating the parent, now create the original path
                                        zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
                                                data, acl, createMode, callback, ctx);
                                    } else {
                                        callback.processResult(rc, path, ctx, name);
                                    }
                                }
                            }, ctx);
                }
            }, ctx);
        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
            callback.processResult(DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE, zkce.getMessage(), ctx, pathToCreate);
        } catch (InterruptedException ie) {
            callback.processResult(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE, ie.getMessage(), ctx, pathToCreate);
        }
    }

    /**
     * Asynchronously create zookeeper path recursively and optimistically.
     *
     * @param zkc Zookeeper client
     * @param pathToCreate  Zookeeper full path
     * @param data Zookeeper data
     * @param acl Acl of the zk path
     * @param createMode Create mode of zk path
     */
    public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
        final ZooKeeperClient zkc,
        final String pathToCreate,
        final byte[] data,
        final List<ACL> acl,
        final CreateMode createMode) {
        Optional<String> parentPathShouldNotCreate = Optional.absent();
        return zkAsyncCreateFullPathOptimistic(
                zkc,
                pathToCreate,
                parentPathShouldNotCreate,
                data,
                acl,
                createMode);
    }

    /**
     * Asynchronously create zookeeper path recursively and optimistically
     *
     * @param zkc Zookeeper client
     * @param pathToCreate  Zookeeper full path
     * @param parentPathShouldNotCreate zookeeper parent path should not be created
     * @param data Zookeeper data
     * @param acl Acl of the zk path
     * @param createMode Create mode of zk path
     */
    public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
        final ZooKeeperClient zkc,
        final String pathToCreate,
        final Optional<String> parentPathShouldNotCreate,
        final byte[] data,
        final List<ACL> acl,
        final CreateMode createMode) {
        final CompletableFuture<Void> result = new CompletableFuture<Void>();

        zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
                data, acl, createMode, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                handleKeeperExceptionCode(rc, path, result);
            }
        }, result);

        return result;
    }

    /**
     * Asynchronously create zookeeper path recursively and optimistically.
     *
     * @param zkc Zookeeper client
     * @param pathToCreate  Zookeeper full path
     * @param data Zookeeper data
     * @param acl Acl of the zk path
     * @param createMode Create mode of zk path
     */
    public static CompletableFuture<Void> zkAsyncCreateFullPathOptimisticAndSetData(
        final ZooKeeperClient zkc,
        final String pathToCreate,
        final byte[] data,
        final List<ACL> acl,
        final CreateMode createMode) {
        final CompletableFuture<Void> result = new CompletableFuture<Void>();

        try {
            zkc.get().setData(pathToCreate, data, -1, new AsyncCallback.StatCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat) {
                    if (rc != KeeperException.Code.NONODE.intValue()) {
                        handleKeeperExceptionCode(rc, path, result);
                        return;
                    }

                    Optional<String> parentPathShouldNotCreate = Optional.absent();
                    zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
                            data, acl, createMode, new AsyncCallback.StringCallback() {
                        @Override
                        public void processResult(int rc, String path, Object ctx, String name) {
                            handleKeeperExceptionCode(rc, path, result);
                        }
                    }, result);
                }
            }, result);
        } catch (Exception exc) {
            result.completeExceptionally(exc);
        }

        return result;
    }

    private static void handleKeeperExceptionCode(int rc, String pathOrMessage, CompletableFuture<Void> result) {
        if (KeeperException.Code.OK.intValue() == rc) {
            result.complete(null);
        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
            result.completeExceptionally(new ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage));
        } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
            result.completeExceptionally(new DLInterruptedException(pathOrMessage));
        } else {
            result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), pathOrMessage));
        }
    }

    public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, String path, boolean watch) {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(zkException(e, path));
        } catch (InterruptedException e) {
            return FutureUtils.exception(zkException(e, path));
        }
        return zkGetData(zk, path, watch);
    }

    /**
     * Retrieve data from zookeeper <code>path</code>.
     *
     * @param path
     *          zookeeper path to retrieve data
     * @param watch
     *          whether to watch the path
     * @return future representing the versioned value. null version or null value means path doesn't exist.
     */
    public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeper zk, String path, boolean watch) {
        final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
        zk.getData(path, watch, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                if (KeeperException.Code.OK.intValue() == rc) {
                    if (null == stat) {
                        promise.complete(new Versioned<byte[]>(null, null));
                    } else {
                        promise.complete(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())));
                    }
                } else if (KeeperException.Code.NONODE.intValue() == rc) {
                    promise.complete(new Versioned<byte[]>(null, null));
                } else {
                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                }
            }
        }, null);
        return promise;
    }

    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeperClient zkc, String path, byte[] data, ZkVersion version) {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(zkException(e, path));
        } catch (InterruptedException e) {
            return FutureUtils.exception(zkException(e, path));
        }
        return zkSetData(zk, path, data, version);
    }

    /**
     * Set <code>data</code> to zookeeper <code>path</code>.
     *
     * @param zk
     *          zookeeper client
     * @param path
     *          path to set data
     * @param data
     *          data to set
     * @param version
     *          version used to set data
     * @return future representing the version after this operation.
     */
    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) {
        final CompletableFuture<ZkVersion> promise = new CompletableFuture<ZkVersion>();
        zk.setData(path, data, version.getZnodeVersion(), new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                if (KeeperException.Code.OK.intValue() == rc) {
                    promise.complete(new ZkVersion(stat.getVersion()));
                    return;
                }
                promise.completeExceptionally(
                        KeeperException.create(KeeperException.Code.get(rc)));
                return;
            }
        }, null);
        return promise;
    }

    public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(zkException(e, path));
        } catch (InterruptedException e) {
            return FutureUtils.exception(zkException(e, path));
        }
        return zkDelete(zk, path, version);
    }

    /**
     * Delete the given <i>path</i> from zookeeper.
     *
     * @param zk
     *          zookeeper client
     * @param path
     *          path to delete
     * @param version
     *          version used to set data
     * @return future representing the version after this operation.
     */
    public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) {
        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
        zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                if (KeeperException.Code.OK.intValue() == rc) {
                    promise.complete(null);
                    return;
                }
                promise.completeExceptionally(
                        KeeperException.create(KeeperException.Code.get(rc)));
                return;
            }
        }, null);
        return promise;
    }

    /**
     * Delete the given <i>path</i> from zookeeper.
     *
     * @param zkc
     *          zookeeper client
     * @param path
     *          path to delete
     * @param version
     *          version used to set data
     * @return future representing if the delete is successful. Return true if the node is deleted,
     * false if the node doesn't exist, otherwise future will throw exception
     *
     */
    public static CompletableFuture<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(zkException(e, path));
        } catch (InterruptedException e) {
            return FutureUtils.exception(zkException(e, path));
        }
        final CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
        zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                if (KeeperException.Code.OK.intValue() == rc ) {
                    promise.complete(true);
                } else if (KeeperException.Code.NONODE.intValue() == rc) {
                    promise.complete(false);
                } else {
                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                }
            }
        }, null);
        return promise;
    }

    public static CompletableFuture<Void> asyncClose(@Nullable AsyncCloseable closeable,
                                          boolean swallowIOException) {
        if (null == closeable) {
            return FutureUtils.Void();
        } else if (swallowIOException) {
            return FutureUtils.ignore(closeable.asyncClose());
        } else {
            return closeable.asyncClose();
        }
    }

    /**
     * Sync zookeeper client on given <i>path</i>.
     *
     * @param zkc
     *          zookeeper client
     * @param path
     *          path to sync
     * @return zookeeper client after sync
     * @throws IOException
     */
    public static ZooKeeper sync(ZooKeeperClient zkc, String path) throws IOException {
        ZooKeeper zk;
        try {
            zk = zkc.get();
        } catch (InterruptedException e) {
            throw new DLInterruptedException("Interrupted on checking if log " + path + " exists", e);
        }
        final CountDownLatch syncLatch = new CountDownLatch(1);
        final AtomicInteger syncResult = new AtomicInteger(0);
        zk.sync(path, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                syncResult.set(rc);
                syncLatch.countDown();
            }
        }, null);
        try {
            syncLatch.await();
        } catch (InterruptedException e) {
            throw new DLInterruptedException("Interrupted on syncing zookeeper connection", e);
        }
        if (KeeperException.Code.OK.intValue() != syncResult.get()) {
            throw new ZKException("Error syncing zookeeper connection ",
                    KeeperException.Code.get(syncResult.get()));
        }
        return zk;
    }

    /**
     * Close a closeable.
     *
     * @param closeable
     *          closeable to close
     */
    public static void close(@Nullable Closeable closeable) {
        if (null == closeable) {
            return;
        }
        try {
            Closeables.close(closeable, true);
        } catch (IOException e) {
            // no-op. the exception is swallowed.
        }
    }

    /**
     * Close an async closeable.
     *
     * @param closeable
     *          closeable to close
     */
    public static void close(@Nullable AsyncCloseable closeable)
            throws IOException {
        if (null == closeable) {
            return;
        }
        Utils.ioResult(closeable.asyncClose());
    }

    /**
     * Close an async closeable.
     *
     * @param closeable
     *          closeable to close
     */
    public static void closeQuietly(@Nullable AsyncCloseable closeable) {
        if (null == closeable) {
            return;
        }
        try {
            Utils.ioResult(closeable.asyncClose());
        } catch (IOException e) {
            // no-op. the exception is swallowed.
        }
    }

    /**
     * Close the closeables in sequence.
     *
     * @param closeables
     *          closeables to close
     * @return future represents the close future
     */
    public static CompletableFuture<Void> closeSequence(ExecutorService executorService,
                                             AsyncCloseable... closeables) {
        return closeSequence(executorService, false, closeables);
    }

    /**
     * Close the closeables in sequence and ignore errors during closing.
     *
     * @param executorService executor to execute closeable
     * @param ignoreCloseError whether to ignore errors during closing
     * @param closeables list of closeables
     * @return future represents the close future.
     */
    public static CompletableFuture<Void> closeSequence(ExecutorService executorService,
                                             boolean ignoreCloseError,
                                             AsyncCloseable... closeables) {
        List<AsyncCloseable> closeableList = Lists.newArrayListWithExpectedSize(closeables.length);
        for (AsyncCloseable closeable : closeables) {
            if (null == closeable) {
                closeableList.add(AsyncCloseable.NULL);
            } else {
                closeableList.add(closeable);
            }
        }
        return FutureUtils.processList(
                closeableList,
                ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : AsyncCloseable.CLOSE_FUNC,
                executorService
        ).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
    }

    /**
     * Gets the parent of a path.
     *
     * @param path
     *            path to get the parent of
     * @return parent of the path or null if no parent exists.
     */
    public static String getParent(final String path) {
        if (path == null) {
            return null;
        }
        if (path.length() < 2) {
            return null;
        }
        int firstIndex = path.indexOf("/");
        if (firstIndex == -1) {
            return null;
        }
        int lastIndex = path.lastIndexOf("/");
        if (lastIndex == path.length() - 1) {
            lastIndex = path.substring(0, path.length() - 1).lastIndexOf("/");
        }
        if (lastIndex == -1) {
            return null;
        }
        if (lastIndex == 0) {
            return "/";
        }
        return path.substring(0, lastIndex);
    }

    /**
     * Convert the <i>throwable</i> to zookeeper related exceptions.
     *
     * @param throwable cause
     * @param path zookeeper path
     * @return zookeeper related exceptions
     */
    public static Throwable zkException(Throwable throwable, String path) {
        if (throwable instanceof KeeperException) {
            return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
        } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
            return new ZKException("Encountered zookeeper connection loss on " + path,
                    KeeperException.Code.CONNECTIONLOSS);
        } else if (throwable instanceof InterruptedException) {
            return new DLInterruptedException("Interrupted on operating " + path, throwable);
        } else {
            return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
        }
    }

    /**
     * Create transmit exception from transmit result.
     *
     * @param transmitResult
     *          transmit result (basically bk exception code)
     * @return transmit exception
     */
    public static BKTransmitException transmitException(int transmitResult) {
        return new BKTransmitException("Failed to write to bookkeeper; Error is ("
            + transmitResult + ") "
            + BKException.getMessage(transmitResult), transmitResult);
    }

    /**
     * A specific version of {@link FutureUtils#result(CompletableFuture)} to handle known exception issues.
     */
    public static <T> T ioResult(CompletableFuture<T> result) throws IOException {
        return FutureUtils.result(
            result,
            (cause) -> {
                if (cause instanceof IOException) {
                    return (IOException) cause;
                } else if (cause instanceof KeeperException) {
                    return new ZKException("Encountered zookeeper exception on waiting result",
                        (KeeperException) cause);
                } else if (cause instanceof BKException) {
                    return new BKTransmitException("Encountered bookkeeper exception on waiting result",
                        ((BKException) cause).getCode());
                } else if (cause instanceof InterruptedException) {
                    return new DLInterruptedException("Interrupted on waiting result", cause);
                } else {
                    return new IOException("Encountered exception on waiting result", cause);
                }
            });
    }

    /**
     * A specific version of {@link FutureUtils#result(CompletableFuture, long, TimeUnit)}
     * to handle known exception issues.
     */
    public static <T> T ioResult(CompletableFuture<T> result, long timeout, TimeUnit timeUnit)
            throws IOException, TimeoutException {
        return FutureUtils.result(
            result,
            (cause) -> {
                if (cause instanceof IOException) {
                    return (IOException) cause;
                } else if (cause instanceof KeeperException) {
                    return new ZKException("Encountered zookeeper exception on waiting result",
                        (KeeperException) cause);
                } else if (cause instanceof BKException) {
                    return new BKTransmitException("Encountered bookkeeper exception on waiting result",
                        ((BKException) cause).getCode());
                } else if (cause instanceof InterruptedException) {
                    return new DLInterruptedException("Interrupted on waiting result", cause);
                } else {
                    return new IOException("Encountered exception on waiting result", cause);
                }
            },
            timeout,
            timeUnit);
    }

    /**
     * Abort async <i>abortable</i>
     *
     * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method
     *                  does nothing.
     * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
     * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
     */
    public static void abort(@Nullable AsyncAbortable abortable,
                             boolean swallowIOException)
            throws IOException {
        if (null == abortable) {
            return;
        }
        try {
            ioResult(abortable.asyncAbort());
        } catch (Exception ioe) {
            if (swallowIOException) {
                log.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe);
            } else {
                throw ioe;
            }
        }
    }

}
