/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.twitter.distributedlog.impl.federated;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.callback.NamespaceListener;
import com.twitter.distributedlog.exceptions.LogExistsException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.ZKNamespaceWatcher;
import com.twitter.distributedlog.metadata.LogMetadataStore;
import com.twitter.distributedlog.namespace.NamespaceWatcher;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Charsets.UTF_8;

/**
 * A Federated ZooKeeper Based Log Metadata Store.
 *
 * To Upgrade a simple ZKLogMetadataStore to FederatedZKLogMetadataStore, following steps should be taken in sequence:
 * a) deploy the new code with disabling createStreamsIfNotExists in all writer.
 * b) once all proxies disable the flag, update namespace binding to enable federated namespace.
 * c) restart writers to take federated namespace in place.
 *
 * NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads
 *       that have lots of creations or deletions.
 */
public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable,
        FutureEventListener<Set<URI>> {

    static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);

    private final static String ZNODE_SUB_NAMESPACES = ".subnamespaces";
    private final static String SUB_NAMESPACE_PREFIX = "NS_";

    /**
     * Create the federated namespace.
     *
     * @param namespace
     *          namespace to create
     * @param zkc
     *          zookeeper client
     * @throws InterruptedException
     * @throws ZooKeeperClient.ZooKeeperConnectionException
     * @throws KeeperException
     */
    public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc)
            throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
        String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
        Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0],
                zkc.getDefaultACL(), CreateMode.PERSISTENT);
    }

    /**
     * Represent a sub namespace inside the federated namespace.
     */
    class SubNamespace implements NamespaceListener {
        final URI uri;
        final ZKNamespaceWatcher watcher;
        Promise<Set<String>> logsFuture = new Promise<Set<String>>();

        SubNamespace(URI uri) {
            this.uri = uri;
            this.watcher = new ZKNamespaceWatcher(conf, uri, zkc, scheduler);
            this.watcher.registerListener(this);
        }

        void watch() {
            this.watcher.watchNamespaceChanges();
        }

        synchronized Future<Set<String>> getLogs() {
            return logsFuture;
        }

        @Override
        public void onStreamsChanged(Iterator<String> newLogsIter) {
            Set<String> newLogs = Sets.newHashSet(newLogsIter);
            Set<String> oldLogs = Sets.newHashSet();

            // update the sub namespace cache
            Promise<Set<String>> newLogsPromise;
            synchronized (this) {
                if (logsFuture.isDefined()) { // the promise is already satisfied
                    try {
                        oldLogs = FutureUtils.result(logsFuture);
                    } catch (IOException e) {
                        logger.error("Unexpected exception when getting logs from a satisified future of {} : ",
                                uri, e);
                    }
                    logsFuture = new Promise<Set<String>>();
                }

                // update the reverse cache
                for (String logName : newLogs) {
                    URI oldURI = log2Locations.putIfAbsent(logName, uri);
                    if (null != oldURI && !Objects.equal(uri, oldURI)) {
                        logger.error("Log {} is found duplicated in multiple locations : old location = {}," +
                                " new location = {}", new Object[] { logName, oldURI, uri });
                        duplicatedLogFound.set(true);
                    }
                }

                // remove the gone streams
                Set<String> deletedLogs = Sets.difference(oldLogs, newLogs);
                for (String logName : deletedLogs) {
                    log2Locations.remove(logName, uri);
                }
                newLogsPromise = logsFuture;
            }
            newLogsPromise.setValue(newLogs);

            // notify namespace changes
            notifyOnNamespaceChanges();
        }
    }

    final DistributedLogConfiguration conf;
    final URI namespace;
    final ZooKeeperClient zkc;
    final OrderedScheduler scheduler;
    final String zkSubnamespacesPath;
    final AtomicBoolean duplicatedLogFound = new AtomicBoolean(false);
    final AtomicReference<String> duplicatedLogName = new AtomicReference<String>(null);
    final AtomicReference<Integer> zkSubnamespacesVersion = new AtomicReference<Integer>(null);

    final int maxLogsPerSubnamespace;
    // sub namespaces
    final ConcurrentSkipListMap<URI, SubNamespace> subNamespaces;
    // map between log name and its location
    final ConcurrentMap<String, URI> log2Locations;
    // final
    final boolean forceCheckLogExistence;

    public FederatedZKLogMetadataStore(
            DistributedLogConfiguration conf,
            URI namespace,
            ZooKeeperClient zkc,
            OrderedScheduler scheduler) throws IOException {
        this.conf = conf;
        this.namespace = namespace;
        this.zkc = zkc;
        this.scheduler = scheduler;
        this.forceCheckLogExistence = conf.getFederatedCheckExistenceWhenCacheMiss();
        this.subNamespaces = new ConcurrentSkipListMap<URI, SubNamespace>();
        this.log2Locations = new ConcurrentHashMap<String, URI>();
        this.zkSubnamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
        this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace();

        // fetch the sub namespace
        Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
        for (URI uri : uris) {
            SubNamespace subNs = new SubNamespace(uri);
            if (null == subNamespaces.putIfAbsent(uri, subNs)) {
                subNs.watch();
                logger.info("Watched sub namespace {}", uri);
            }
        }

        logger.info("Federated ZK LogMetadataStore is initialized for {}", namespace);
    }

    private void scheduleTask(Runnable r, long ms) {
        if (duplicatedLogFound.get()) {
            logger.error("Scheduler is halted for federated namespace {} as duplicated log found",
                    namespace);
            return;
        }
        try {
            scheduler.schedule(r, ms, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException ree) {
            logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree});
        }
    }

    private <T> Future<T> postStateCheck(Future<T> future) {
        final Promise<T> postCheckedPromise = new Promise<T>();
        future.addEventListener(new FutureEventListener<T>() {
            @Override
            public void onSuccess(T value) {
                if (duplicatedLogFound.get()) {
                    postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace));
                } else {
                    postCheckedPromise.setValue(value);
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                postCheckedPromise.setException(cause);
            }
        });
        return postCheckedPromise;
    }

    //
    // SubNamespace Related Methods
    //

    @VisibleForTesting
    Set<URI> getSubnamespaces() {
        return subNamespaces.keySet();
    }

    @VisibleForTesting
    void removeLogFromCache(String logName) {
        log2Locations.remove(logName);
    }

    private URI getSubNamespaceURI(String ns) throws URISyntaxException {
        return new URI(
                namespace.getScheme(),
                namespace.getUserInfo(),
                namespace.getHost(),
                namespace.getPort(),
                namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + ns,
                namespace.getQuery(),
                namespace.getFragment());
    }

    Future<Set<URI>> getCachedSubNamespaces() {
        Set<URI> nsSet = subNamespaces.keySet();
        return Future.value(nsSet);
    }

    Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
        final Promise<Set<URI>> promise = new Promise<Set<URI>>();
        try {
            zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx) {
                    if (Code.OK.intValue() == rc) {
                        fetchSubNamespaces(watcher, promise);
                    } else {
                        promise.setException(KeeperException.create(Code.get(rc)));
                    }
                }
            }, null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.setException(e);
        } catch (InterruptedException e) {
            promise.setException(e);
        }
        return promise;
    }

    private void fetchSubNamespaces(Watcher watcher,
                                    final Promise<Set<URI>> promise) {
        try {
            zkc.get().getChildren(this.zkSubnamespacesPath, watcher,
                    new AsyncCallback.Children2Callback() {
                        @Override
                        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                            if (Code.NONODE.intValue() == rc) {
                                promise.setException(new UnexpectedException(
                                        "The subnamespaces don't exist for the federated namespace " + namespace));
                            } else if (Code.OK.intValue() == rc) {
                                Set<URI> subnamespaces = Sets.newHashSet();
                                subnamespaces.add(namespace);
                                try {
                                    for (String ns : children) {
                                        subnamespaces.add(getSubNamespaceURI(ns));
                                    }
                                } catch (URISyntaxException use) {
                                    logger.error("Invalid sub namespace uri found : ", use);
                                    promise.setException(new UnexpectedException(
                                            "Invalid sub namespace uri found in " + namespace, use));
                                    return;
                                }
                                // update the sub namespaces set before update version
                                setZkSubnamespacesVersion(stat.getVersion());
                                promise.setValue(subnamespaces);
                            }
                        }
                    }, null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.setException(e);
        } catch (InterruptedException e) {
            promise.setException(e);
        }
    }

    @Override
    public void run() {
        fetchSubNamespaces(this).addEventListener(this);
    }

    @Override
    public void onSuccess(Set<URI> uris) {
        for (URI uri : uris) {
            if (subNamespaces.containsKey(uri)) {
                continue;
            }
            SubNamespace subNs = new SubNamespace(uri);
            if (null == subNamespaces.putIfAbsent(uri, subNs)) {
                subNs.watch();
                logger.info("Watched new sub namespace {}.", uri);
                notifyOnNamespaceChanges();
            }
        }
    }

    @Override
    public void onFailure(Throwable cause) {
        // failed to fetch namespaces, retry later
        scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if (Event.EventType.None == watchedEvent.getType() &&
                Event.KeeperState.Expired == watchedEvent.getState()) {
            scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
            return;
        }
        if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
            // fetch the namespace
            fetchSubNamespaces(this).addEventListener(this);
        }
    }

    //
    // Log Related Methods
    //

    private <A> Future<A> duplicatedLogException(String logName) {
        return Future.exception(new UnexpectedException("Duplicated log " + logName
                + " found in namespace " + namespace));
    }

    @Override
    public Future<URI> createLog(final String logName) {
        if (duplicatedLogFound.get()) {
            return duplicatedLogException(duplicatedLogName.get());
        }
        Promise<URI> createPromise = new Promise<URI>();
        doCreateLog(logName, createPromise);
        return postStateCheck(createPromise);
    }

    void doCreateLog(final String logName, final Promise<URI> createPromise) {
        getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() {
            @Override
            public void onSuccess(Optional<URI> uriOptional) {
                if (uriOptional.isPresent()) {
                    createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
                } else {
                    getCachedSubNamespacesAndCreateLog(logName, createPromise);
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                createPromise.setException(cause);
            }
        });
    }

    private void getCachedSubNamespacesAndCreateLog(final String logName,
                                                    final Promise<URI> createPromise) {
        getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() {
            @Override
            public void onSuccess(Set<URI> uris) {
                findSubNamespaceToCreateLog(logName, uris, createPromise);
            }

            @Override
            public void onFailure(Throwable cause) {
                createPromise.setException(cause);
            }
        });
    }

    private void fetchSubNamespacesAndCreateLog(final String logName,
                                                final Promise<URI> createPromise) {
        fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() {
            @Override
            public void onSuccess(Set<URI> uris) {
                findSubNamespaceToCreateLog(logName, uris, createPromise);
            }

            @Override
            public void onFailure(Throwable cause) {
                createPromise.setException(cause);
            }
        });
    }

    private void findSubNamespaceToCreateLog(final String logName,
                                             final Set<URI> uris,
                                             final Promise<URI> createPromise) {
        final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size());
        List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
        for (URI uri : uris) {
            SubNamespace subNs = subNamespaces.get(uri);
            if (null == subNs) {
                createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found"));
                return;
            }
            futureList.add(subNs.getLogs());
            uriList.add(uri);
        }
        Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
            @Override
            public void onSuccess(List<Set<String>> resultList) {
                for (int i = resultList.size() - 1; i >= 0; i--) {
                    Set<String> logs = resultList.get(i);
                    if (logs.size() < maxLogsPerSubnamespace) {
                        URI uri = uriList.get(i);
                        createLogInNamespace(uri, logName, createPromise);
                        return;
                    }
                }
                // All sub namespaces are full
                createSubNamespace().addEventListener(new FutureEventListener<URI>() {
                    @Override
                    public void onSuccess(URI uri) {
                        // the new namespace will be propagated to the namespace cache by the namespace listener
                        // so we don't need to cache it here. we could go ahead to create the stream under this
                        // namespace, as we are using sequential znode. we are mostly the first guy who create
                        // the log under this namespace.
                        createLogInNamespace(uri, logName, createPromise);
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        createPromise.setException(cause);
                    }
                });
            }

            @Override
            public void onFailure(Throwable cause) {
                createPromise.setException(cause);
            }
        });
    }

    private String getNamespaceFromZkPath(String zkPath) throws UnexpectedException {
        String parts[] = zkPath.split(SUB_NAMESPACE_PREFIX);
        if (parts.length <= 0) {
            throw new UnexpectedException("Invalid namespace @ " + zkPath);
        }
        return SUB_NAMESPACE_PREFIX + parts[parts.length - 1];
    }

    Future<URI> createSubNamespace() {
        final Promise<URI> promise = new Promise<URI>();

        final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX;
        try {
            zkc.get().create(nsPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL,
                    new AsyncCallback.StringCallback() {
                        @Override
                        public void processResult(int rc, String path, Object ctx, String name) {
                            if (Code.OK.intValue() == rc) {
                                try {
                                    URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name));
                                    logger.info("Created sub namespace {}", newUri);
                                    promise.setValue(newUri);
                                } catch (UnexpectedException ue) {
                                    promise.setException(ue);
                                } catch (URISyntaxException e) {
                                    promise.setException(new UnexpectedException("Invalid namespace " + name + " is created."));
                                }
                            } else {
                                promise.setException(KeeperException.create(Code.get(rc)));
                            }
                        }
                    }, null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.setException(e);
        } catch (InterruptedException e) {
            promise.setException(e);
        }

        return promise;
    }

    /**
     * Create a log under the namespace. To guarantee there is only one creation happens at time
     * in a federated namespace, we use CAS operation in zookeeper.
     *
     * @param uri
     *          namespace
     * @param logName
     *          name of the log
     * @param createPromise
     *          the promise representing the creation result.
     */
    private void createLogInNamespace(final URI uri,
                                      final String logName,
                                      final Promise<URI> createPromise) {
        // TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi
        scheduler.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    createLogInNamespaceSync(uri, logName);
                    createPromise.setValue(uri);
                } catch (InterruptedException e) {
                    createPromise.setException(e);
                } catch (IOException e) {
                    createPromise.setException(e);
                } catch (KeeperException.BadVersionException bve) {
                    fetchSubNamespacesAndCreateLog(logName, createPromise);
                } catch (KeeperException e) {
                    createPromise.setException(e);
                }
            }
        });
    }

    void createLogInNamespaceSync(URI uri, String logName)
            throws InterruptedException, IOException, KeeperException {
        Transaction txn = zkc.get().transaction();
        // we don't have the zk version yet. set it to 0 instead of -1, to prevent non CAS operation.
        int zkVersion = null == zkSubnamespacesVersion.get() ? 0 : zkSubnamespacesVersion.get();
        txn.setData(zkSubnamespacesPath, uri.getPath().getBytes(UTF_8), zkVersion);
        String logPath = uri.getPath() + "/" + logName;
        txn.create(logPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT);
        try {
            txn.commit();
            // if the transaction succeed, the zk version is advanced
            setZkSubnamespacesVersion(zkVersion + 1);
        } catch (KeeperException ke) {
            List<OpResult> opResults = ke.getResults();
            OpResult createResult = opResults.get(1);
            if (createResult instanceof OpResult.ErrorResult) {
                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) createResult;
                if (Code.NODEEXISTS.intValue() == errorResult.getErr()) {
                    throw new LogExistsException("Log " + logName + " already exists");
                }
            }
            OpResult setResult = opResults.get(0);
            if (setResult instanceof OpResult.ErrorResult) {
                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) setResult;
                if (Code.BADVERSION.intValue() == errorResult.getErr()) {
                    throw KeeperException.create(Code.BADVERSION);
                }
            }
            throw new ZKException("ZK exception in creating log " + logName + " in " + uri, ke);
        }
    }

    void setZkSubnamespacesVersion(int zkVersion) {
        Integer oldVersion;
        boolean done = false;
        while (!done) {
            oldVersion = zkSubnamespacesVersion.get();
            if (null == oldVersion) {
                done = zkSubnamespacesVersion.compareAndSet(null, zkVersion);
                continue;
            }
            if (oldVersion < zkVersion) {
                done = zkSubnamespacesVersion.compareAndSet(oldVersion, zkVersion);
                continue;
            } else {
                done = true;
            }
        }
    }

    @Override
    public Future<Optional<URI>> getLogLocation(final String logName) {
        if (duplicatedLogFound.get()) {
            return duplicatedLogException(duplicatedLogName.get());
        }
        URI location = log2Locations.get(logName);
        if (null != location) {
            return postStateCheck(Future.value(Optional.of(location)));
        }
        if (!forceCheckLogExistence) {
            Optional<URI> result = Optional.absent();
            return Future.value(result);
        }
        return postStateCheck(fetchLogLocation(logName).onSuccess(
                new AbstractFunction1<Optional<URI>, BoxedUnit>() {
                    @Override
                    public BoxedUnit apply(Optional<URI> uriOptional) {
                        if (uriOptional.isPresent()) {
                            log2Locations.putIfAbsent(logName, uriOptional.get());
                        }
                        return BoxedUnit.UNIT;
                    }
                }));
    }

    private Future<Optional<URI>> fetchLogLocation(final String logName) {
        final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();

        Set<URI> uris = subNamespaces.keySet();
        List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
        for (URI uri : uris) {
            fetchFutures.add(fetchLogLocation(uri, logName));
        }
        Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
            @Override
            public void onSuccess(List<Optional<URI>> fetchResults) {
                Optional<URI> result = Optional.absent();
                for (Optional<URI> fetchResult : fetchResults) {
                    if (result.isPresent()) {
                        if (fetchResult.isPresent()) {
                            logger.error("Log {} is found in multiple sub namespaces : {} & {}.",
                                    new Object[] { logName, result.get(), fetchResult.get() });
                            duplicatedLogName.compareAndSet(null, logName);
                            duplicatedLogFound.set(true);
                            fetchPromise.setException(new UnexpectedException("Log " + logName
                                    + " is found in multiple sub namespaces : "
                                    + result.get() + " & " + fetchResult.get()));
                            return;
                        }
                    } else {
                        result = fetchResult;
                    }
                }
                fetchPromise.setValue(result);
            }

            @Override
            public void onFailure(Throwable cause) {
                fetchPromise.setException(cause);
            }
        });
        return fetchPromise;
    }

    private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
        final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
        final String logRootPath = uri.getPath() + "/" + logName;
        try {
            zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat) {
                    if (Code.OK.intValue() == rc) {
                        fetchPromise.setValue(Optional.of(uri));
                    } else if (Code.NONODE.intValue() == rc) {
                        fetchPromise.setValue(Optional.<URI>absent());
                    } else {
                        fetchPromise.setException(KeeperException.create(Code.get(rc)));
                    }
                }
            }, null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            fetchPromise.setException(e);
        } catch (InterruptedException e) {
            fetchPromise.setException(e);
        }
        return fetchPromise;
    }

    @Override
    public Future<Iterator<String>> getLogs() {
        if (duplicatedLogFound.get()) {
            return duplicatedLogException(duplicatedLogName.get());
        }
        return postStateCheck(retrieveLogs().map(
                new AbstractFunction1<List<Set<String>>, Iterator<String>>() {
                    @Override
                    public Iterator<String> apply(List<Set<String>> resultList) {
                        return getIterator(resultList);
                    }
                }));
    }

    private Future<List<Set<String>>> retrieveLogs() {
        Collection<SubNamespace> subNss = subNamespaces.values();
        List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
        for (SubNamespace subNs : subNss) {
            logsList.add(subNs.getLogs());
        }
        return Future.collect(logsList);
    }

    private Iterator<String> getIterator(List<Set<String>> resultList) {
        List<Iterator<String>> iterList = Lists.newArrayListWithExpectedSize(resultList.size());
        for (Set<String> result : resultList) {
            iterList.add(result.iterator());
        }
        return Iterators.concat(iterList.iterator());
    }

    @Override
    public void registerNamespaceListener(NamespaceListener listener) {
        registerListener(listener);
    }

    @Override
    protected void watchNamespaceChanges() {
        // as the federated namespace already started watching namespace changes,
        // we don't need to do any actions here
    }

    private void notifyOnNamespaceChanges() {
        retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() {
            @Override
            public BoxedUnit apply(List<Set<String>> resultList) {
                for (NamespaceListener listener : listeners) {
                    listener.onStreamsChanged(getIterator(resultList));
                }
                return BoxedUnit.UNIT;
            }
        });
    }
}
