/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.client;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordSetBuffer;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.client.ownership.OwnershipCache;
import org.apache.distributedlog.client.proxy.ClusterClient;
import org.apache.distributedlog.client.proxy.HostProvider;
import org.apache.distributedlog.client.proxy.ProxyClient;
import org.apache.distributedlog.client.proxy.ProxyClientManager;
import org.apache.distributedlog.client.proxy.ProxyListener;
import org.apache.distributedlog.client.resolver.RegionResolver;
import org.apache.distributedlog.client.routing.RoutingService;
import org.apache.distributedlog.client.routing.RoutingService.RoutingContext;
import org.apache.distributedlog.client.stats.ClientStats;
import org.apache.distributedlog.client.stats.OpStats;
import org.apache.distributedlog.exceptions.DLClientClosedException;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.ServiceUnavailableException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.service.DLSocketAddress;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.thrift.service.BulkWriteResponse;
import org.apache.distributedlog.thrift.service.HeartbeatOptions;
import org.apache.distributedlog.thrift.service.ResponseHeader;
import org.apache.distributedlog.thrift.service.ServerInfo;
import org.apache.distributedlog.thrift.service.ServerStatus;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteContext;
import org.apache.distributedlog.thrift.service.WriteResponse;
import org.apache.distributedlog.util.ProtocolUtils;
import com.twitter.finagle.CancelledRequestException;
import com.twitter.finagle.ConnectionFailedException;
import com.twitter.finagle.Failure;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.RequestTimeoutException;
import com.twitter.finagle.ServiceException;
import com.twitter.finagle.ServiceTimeoutException;
import com.twitter.finagle.WriteException;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId;
import com.twitter.util.Duration;
import com.twitter.util.Function;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.Return;
import com.twitter.util.Throw;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.thrift.TApplicationException;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.runtime.AbstractFunction1;


/**
 * Implementation of distributedlog client.
 */
public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient,
        RoutingService.RoutingListener, ProxyListener, HostProvider {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);

    private final String clientName;
    private final ClientId clientId;
    private final ClientConfig clientConfig;
    private final RoutingService routingService;
    private final ProxyClient.Builder clientBuilder;
    private final boolean streamFailfast;
    private final Pattern streamNameRegexPattern;

    // Timer
    private final HashedWheelTimer dlTimer;

    // region resolver
    private final RegionResolver regionResolver;

    // Ownership maintenance
    private final OwnershipCache ownershipCache;
    // Channel/Client management
    private final ProxyClientManager clientManager;
    // Cluster Client (for routing service)
    private final Optional<ClusterClient> clusterClient;

    // Close Status
    private boolean closed = false;
    private final ReentrantReadWriteLock closeLock =
            new ReentrantReadWriteLock();

    abstract class StreamOp implements TimerTask {
        final String stream;

        final AtomicInteger tries = new AtomicInteger(0);
        final RoutingContext routingContext = RoutingContext.of(regionResolver);
        final WriteContext ctx = new WriteContext();
        final Stopwatch stopwatch;
        final OpStats opStats;
        SocketAddress nextAddressToSend;

        StreamOp(final String stream, final OpStats opStats) {
            this.stream = stream;
            this.stopwatch = Stopwatch.createStarted();
            this.opStats = opStats;
        }

        boolean shouldTimeout() {
            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            return shouldTimeout(elapsedMs);
        }

        boolean shouldTimeout(long elapsedMs) {
            return clientConfig.getRequestTimeoutMs() > 0
                && elapsedMs >= clientConfig.getRequestTimeoutMs();
        }

        void send(SocketAddress address) {
            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            if (clientConfig.getMaxRedirects() > 0
                && tries.get() >= clientConfig.getMaxRedirects()) {
                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
                        "Exhausted max redirects in " + elapsedMs + " ms"));
                return;
            } else if (shouldTimeout(elapsedMs)) {
                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
                        "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
                                + " in " + elapsedMs + " ms"));
                return;
            }
            synchronized (this) {
                String addrStr = address.toString();
                if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) {
                    nextAddressToSend = address;
                    dlTimer.newTimeout(this,
                            Math.min(clientConfig.getRedirectBackoffMaxMs(),
                                    tries.get() * clientConfig.getRedirectBackoffStartMs()),
                            TimeUnit.MILLISECONDS);
                } else {
                    doSend(address);
                }
            }
        }

        abstract Future<ResponseHeader> sendRequest(ProxyClient sc);

        void doSend(SocketAddress address) {
            ctx.addToTriedHosts(address.toString());
            if (clientConfig.isChecksumEnabled()) {
                Long crc32 = computeChecksum();
                if (null != crc32) {
                    ctx.setCrc32(crc32);
                }
            }
            tries.incrementAndGet();
            sendWriteRequest(address, this);
        }

        void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) {
            ownershipCache.updateOwner(stream, sc.getAddress());
        }

        void complete(SocketAddress address) {
            stopwatch.stop();
            opStats.completeRequest(address,
                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
        }

        void fail(SocketAddress address, Throwable t) {
            stopwatch.stop();
            opStats.failRequest(address,
                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
        }

        Long computeChecksum() {
            return null;
        }

        @Override
        public synchronized void run(Timeout timeout) throws Exception {
            if (!timeout.isCancelled() && null != nextAddressToSend) {
                doSend(nextAddressToSend);
            } else {
                fail(null, new CancelledRequestException());
            }
        }
    }

    class BulkWriteOp extends StreamOp {

        final List<ByteBuffer> data;
        final ArrayList<Promise<DLSN>> results;

        BulkWriteOp(final String name, final List<ByteBuffer> data) {
            super(name, clientStats.getOpStats("bulk_write"));
            this.data = data;

            // This could take a while (relatively speaking) for very large inputs. We probably don't want
            // to go so large for other reasons though.
            this.results = new ArrayList<Promise<DLSN>>(data.size());
            for (int i = 0; i < data.size(); i++) {
                checkNotNull(data.get(i));
                this.results.add(new Promise<DLSN>());
            }
        }

        @Override
        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
            return sc.getService().writeBulkWithContext(stream, data, ctx)
                .addEventListener(new FutureEventListener<BulkWriteResponse>() {
                @Override
                public void onSuccess(BulkWriteResponse response) {
                    // For non-success case, the ResponseHeader handler (the caller) will handle it.
                    // Note success in this case means no finagle errors have occurred
                    // (such as finagle connection issues). In general code != SUCCESS means there's some error
                    // reported by dlog service. The caller will handle such errors.
                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
                        beforeComplete(sc, response.getHeader());
                        BulkWriteOp.this.complete(sc.getAddress(), response);
                        if (response.getWriteResponses().size() == 0 && data.size() > 0) {
                            logger.error("non-empty bulk write got back empty response without failure for stream {}",
                                stream);
                        }
                    }
                }
                @Override
                public void onFailure(Throwable cause) {
                    // Handled by the ResponseHeader listener (attached by the caller).
                }
            }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
                @Override
                public ResponseHeader apply(BulkWriteResponse response) {
                    // We need to return the ResponseHeader to the caller's listener to process DLOG errors.
                    return response.getHeader();
                }
            });
        }

        void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) {
            super.complete(address);
            Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator();
            Iterator<Promise<DLSN>> resultIterator = results.iterator();

            // Fill in errors from thrift responses.
            while (resultIterator.hasNext() && writeResponseIterator.hasNext()) {
                Promise<DLSN> result = resultIterator.next();
                WriteResponse writeResponse = writeResponseIterator.next();
                if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) {
                    result.setValue(DLSN.deserialize(writeResponse.getDlsn()));
                } else {
                    result.setException(DLException.of(writeResponse.getHeader()));
                }
            }

            // Should never happen, but just in case so there's some record.
            if (bulkWriteResponse.getWriteResponses().size() != data.size()) {
                logger.error("wrong number of results, response = {} records = {}",
                    bulkWriteResponse.getWriteResponses().size(), data.size());
            }
        }

        @Override
        void fail(SocketAddress address, Throwable t) {

            // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level
            // exception to apply to the first write. In fact for request level exceptions no request has ever been
            // attempted, but logically we associate the error with the first write.
            super.fail(address, t);
            Iterator<Promise<DLSN>> resultIterator = results.iterator();

            // Fail the first write with the batch level failure.
            if (resultIterator.hasNext()) {
                Promise<DLSN> result = resultIterator.next();
                result.setException(t);
            }

            // Fail the remaining writes as cancelled requests.
            while (resultIterator.hasNext()) {
                Promise<DLSN> result = resultIterator.next();
                result.setException(new CancelledRequestException());
            }
        }

        @SuppressWarnings("unchecked")
        List<Future<DLSN>> result() {
            return (List) results;
        }
    }

    abstract class AbstractWriteOp extends StreamOp {

        final Promise<WriteResponse> result = new Promise<WriteResponse>();
        Long crc32 = null;

        AbstractWriteOp(final String name, final OpStats opStats) {
            super(name, opStats);
        }

        void complete(SocketAddress address, WriteResponse response) {
            super.complete(address);
            result.setValue(response);
        }

        @Override
        void fail(SocketAddress address, Throwable t) {
            super.fail(address, t);
            result.setException(t);
        }

        @Override
        Long computeChecksum() {
            if (null == crc32) {
                crc32 = ProtocolUtils.streamOpCRC32(stream);
            }
            return crc32;
        }

        @Override
        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
            return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() {
                @Override
                public void onSuccess(WriteResponse response) {
                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
                        beforeComplete(sc, response.getHeader());
                        AbstractWriteOp.this.complete(sc.getAddress(), response);
                    }
                }
                @Override
                public void onFailure(Throwable cause) {
                    // handled by the ResponseHeader listener
                }
            }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
                @Override
                public ResponseHeader apply(WriteResponse response) {
                    return response.getHeader();
                }
            });
        }

        abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
    }

    class WriteOp extends AbstractWriteOp {
        final ByteBuffer data;

        WriteOp(final String name, final ByteBuffer data) {
            super(name, clientStats.getOpStats("write"));
            this.data = data;
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().writeWithContext(stream, data, ctx);
        }

        @Override
        Long computeChecksum() {
            if (null == crc32) {
                byte[] dataBytes = new byte[data.remaining()];
                data.duplicate().get(dataBytes);
                crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
            }
            return crc32;
        }

        Future<DLSN> result() {
            return result.map(new AbstractFunction1<WriteResponse, DLSN>() {
                @Override
                public DLSN apply(WriteResponse response) {
                    return DLSN.deserialize(response.getDlsn());
                }
            });
        }
    }

    class TruncateOp extends AbstractWriteOp {
        final DLSN dlsn;

        TruncateOp(String name, DLSN dlsn) {
            super(name, clientStats.getOpStats("truncate"));
            this.dlsn = dlsn;
        }

        @Override
        Long computeChecksum() {
            if (null == crc32) {
                crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn);
            }
            return crc32;
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().truncate(stream, dlsn.serialize(), ctx);
        }

        Future<Boolean> result() {
            return result.map(new AbstractFunction1<WriteResponse, Boolean>() {
                @Override
                public Boolean apply(WriteResponse response) {
                    return true;
                }
            });
        }
    }

    class WriteRecordSetOp extends WriteOp {

        WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) {
            super(name, recordSet.getBuffer());
            ctx.setIsRecordSet(true);
        }

    }


    class ReleaseOp extends AbstractWriteOp {

        ReleaseOp(String name) {
            super(name, clientStats.getOpStats("release"));
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().release(stream, ctx);
        }

        @Override
        void beforeComplete(ProxyClient sc, ResponseHeader header) {
            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
        }

        Future<Void> result() {
            return result.map(new AbstractFunction1<WriteResponse, Void>() {
                @Override
                public Void apply(WriteResponse response) {
                    return null;
                }
            });
        }
    }

    class DeleteOp extends AbstractWriteOp {

        DeleteOp(String name) {
            super(name, clientStats.getOpStats("delete"));
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().delete(stream, ctx);
        }

        @Override
        void beforeComplete(ProxyClient sc, ResponseHeader header) {
            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
        }

        Future<Void> result() {
            return result.map(new AbstractFunction1<WriteResponse, Void>() {
                @Override
                public Void apply(WriteResponse v1) {
                    return null;
                }
            });
        }
    }

    class CreateOp extends AbstractWriteOp {

        CreateOp(String name) {
            super(name, clientStats.getOpStats("create"));
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().create(stream, ctx);
        }

        @Override
        void beforeComplete(ProxyClient sc, ResponseHeader header) {
            ownershipCache.updateOwner(stream, sc.getAddress());
        }

        Future<Void> result() {
            return result.map(new AbstractFunction1<WriteResponse, Void>() {
                @Override
                public Void apply(WriteResponse v1) {
                    return null;
                }
            }).voided();
        }
    }

    class HeartbeatOp extends AbstractWriteOp {
        HeartbeatOptions options;

        HeartbeatOp(String name, boolean sendReaderHeartBeat) {
            super(name, clientStats.getOpStats("heartbeat"));
            options = new HeartbeatOptions();
            options.setSendHeartBeatToReader(sendReaderHeartBeat);
        }

        @Override
        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
            return sc.getService().heartbeatWithOptions(stream, ctx, options);
        }

        Future<Void> result() {
            return result.map(new AbstractFunction1<WriteResponse, Void>() {
                @Override
                public Void apply(WriteResponse response) {
                    return null;
                }
            });
        }
    }

    // Stats
    private final ClientStats clientStats;

    public DistributedLogClientImpl(String name,
                                    ClientId clientId,
                                    RoutingService routingService,
                                    ClientBuilder clientBuilder,
                                    ClientConfig clientConfig,
                                    Optional<ClusterClient> clusterClient,
                                    StatsReceiver statsReceiver,
                                    StatsReceiver streamStatsReceiver,
                                    RegionResolver regionResolver,
                                    boolean enableRegionStats) {
        this.clientName = name;
        this.clientId = clientId;
        this.routingService = routingService;
        this.clientConfig = clientConfig;
        this.streamFailfast = clientConfig.getStreamFailfast();
        this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex());
        this.regionResolver = regionResolver;
        // Build the timer
        this.dlTimer = new HashedWheelTimer(
                new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(),
                this.clientConfig.getRedirectBackoffStartMs(),
                TimeUnit.MILLISECONDS);
        // register routing listener
        this.routingService.registerListener(this);
        // build the ownership cache
        this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver);
        // Client Stats
        this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver);
        // Client Manager
        this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
        this.clientManager = new ProxyClientManager(
                this.clientConfig,  // client config
                this.clientBuilder, // client builder
                this.dlTimer,       // timer
                this,               // host provider
                clientStats);       // client stats
        this.clusterClient = clusterClient;
        this.clientManager.registerProxyListener(this);

        // Cache Stats
        StatsReceiver cacheStatReceiver = statsReceiver.scope("cache");
        Seq<String> numCachedStreamsGaugeName =
                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList();
        cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() {
            @Override
            public Object apply() {
                return (float) ownershipCache.getNumCachedStreams();
            }
        });
        Seq<String> numCachedHostsGaugeName =
                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList();
        cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() {
            @Override
            public Object apply() {
                return (float) clientManager.getNumProxies();
            }
        });

        logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {},"
            + " stats_receiver = {}, thriftmux = {}",
            new Object[] {
                name,
                clientId,
                routingService.getClass(),
                statsReceiver.getClass(),
                clientConfig.getThriftMux()
            });
    }

    @Override
    public Set<SocketAddress> getHosts() {
        Set<SocketAddress> hosts = Sets.newHashSet();
        // if using server side routing, we only handshake with the hosts in ownership cache.
        if (!clusterClient.isPresent()) {
            hosts.addAll(this.routingService.getHosts());
        }
        hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
        return hosts;
    }

    @Override
    public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
        if (null != serverInfo
            && serverInfo.isSetServerStatus()
            && ServerStatus.DOWN == serverInfo.getServerStatus()) {
            logger.info("{} is detected as DOWN during handshaking", address);
            // server is shutting down
            handleServiceUnavailable(address, client, Optional.<StreamOp>absent());
            return;
        }

        if (null != serverInfo && serverInfo.isSetOwnerships()) {
            Map<String, String> ownerships = serverInfo.getOwnerships();
            logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size());
            for (Map.Entry<String, String> entry : ownerships.entrySet()) {
                Matcher matcher = streamNameRegexPattern.matcher(entry.getKey());
                if (!matcher.matches()) {
                    continue;
                }
                updateOwnership(entry.getKey(), entry.getValue());
            }
        } else {
            logger.debug("Handshaked with {} : no ownerships returned", address);
        }
    }

    @Override
    public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
        cause = showRootCause(Optional.<StreamOp>absent(), cause);
        handleRequestException(address, client, Optional.<StreamOp>absent(), cause);
    }

    @VisibleForTesting
    public void handshake() {
        clientManager.handshake();
        logger.info("Handshaked with {} hosts, cached {} streams",
                clientManager.getNumProxies(), ownershipCache.getNumCachedStreams());
    }

    @Override
    public void onServerLeft(SocketAddress address) {
        onServerLeft(address, null);
    }

    private void onServerLeft(SocketAddress address, ProxyClient sc) {
        ownershipCache.removeAllStreamsFromOwner(address);
        if (null == sc) {
            clientManager.removeClient(address);
        } else {
            clientManager.removeClient(address, sc);
        }
    }

    @Override
    public void onServerJoin(SocketAddress address) {
        // we only pre-create connection for client-side routing
        // if it is server side routing, we only know the exact proxy address
        // when #getOwner.
        if (!clusterClient.isPresent()) {
            clientManager.createClient(address);
        }
    }

    public void close() {
        closeLock.writeLock().lock();
        try {
            if (closed) {
                return;
            }
            closed = true;
        } finally {
            closeLock.writeLock().unlock();
        }
        clientManager.close();
        routingService.unregisterListener(this);
        routingService.stopService();
        dlTimer.stop();
    }

    @Override
    public Future<Void> check(String stream) {
        final HeartbeatOp op = new HeartbeatOp(stream, false);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Future<Void> heartbeat(String stream) {
        final HeartbeatOp op = new HeartbeatOp(stream, true);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
        return ownershipCache.getStreamOwnershipDistribution();
    }

    @Override
    public Future<Void> setAcceptNewStream(boolean enabled) {
        Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
        List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
        for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
            futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
        }
        return Future.collect(futures).map(new Function<List<Void>, Void>() {
            @Override
            public Void apply(List<Void> list) {
                return null;
            }
        });
    }

    @Override
    public Future<DLSN> write(String stream, ByteBuffer data) {
        final WriteOp op = new WriteOp(stream, data);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) {
        final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet);
        sendRequest(op);
        return op.result();
    }

    @Override
    public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) {
        if (data.size() > 0) {
            final BulkWriteOp op = new BulkWriteOp(stream, data);
            sendRequest(op);
            return op.result();
        } else {
            return Collections.emptyList();
        }
    }

    @Override
    public Future<Boolean> truncate(String stream, DLSN dlsn) {
        final TruncateOp op = new TruncateOp(stream, dlsn);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Future<Void> delete(String stream) {
        final DeleteOp op = new DeleteOp(stream);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Future<Void> release(String stream) {
        final ReleaseOp op = new ReleaseOp(stream);
        sendRequest(op);
        return op.result();
    }

    @Override
    public Future<Void> create(String stream) {
        final CreateOp op = new CreateOp(stream);
        sendRequest(op);
        return op.result();
    }

    private void sendRequest(final StreamOp op) {
        closeLock.readLock().lock();
        try {
            if (closed) {
                op.fail(null, new DLClientClosedException("Client " + clientName + " is closed."));
            } else {
                doSend(op, null);
            }
        } finally {
            closeLock.readLock().unlock();
        }
    }

    /**
     * Send the stream operation by routing service, excluding previous address if it is not null.
     *
     * @param op
     *          stream operation.
     * @param previousAddr
     *          previous tried address.
     */
    private void doSend(final StreamOp op, final SocketAddress previousAddr) {
        if (null != previousAddr) {
            op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION);
        }
        // Get host first
        final SocketAddress address = ownershipCache.getOwner(op.stream);
        if (null == address || op.routingContext.isTriedHost(address)) {
            getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() {
                @Override
                public void onFailure(Throwable cause) {
                    op.fail(null, cause);
                }

                @Override
                public void onSuccess(SocketAddress ownerAddr) {
                    op.send(ownerAddr);
                }
            });
        } else {
            op.send(address);
        }
    }

    private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
                                                final Promise<SocketAddress> getOwnerPromise,
                                                final Throwable cause) {
        if (op.shouldTimeout()) {
            op.fail(null, cause);
            return;
        }
        getOwnerFromResourcePlacementServer(op, getOwnerPromise);
    }

    private void getOwnerFromResourcePlacementServer(final StreamOp op,
                                                     final Promise<SocketAddress> getOwnerPromise) {
        clusterClient.get().getService().getOwner(op.stream, op.ctx)
            .addEventListener(new FutureEventListener<WriteResponse>() {
                @Override
                public void onFailure(Throwable cause) {
                    getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause));
                }

                @Override
                public void onSuccess(WriteResponse value) {
                    if (StatusCode.FOUND == value.getHeader().getCode()
                          && null != value.getHeader().getLocation()) {
                        try {
                            InetSocketAddress addr = DLSocketAddress.deserialize(
                                value.getHeader().getLocation()
                            ).getSocketAddress();
                            getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
                        } catch (IOException e) {
                            // retry from the routing server again
                            logger.error("ERROR in getOwner", e);
                            retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
                            return;
                        }
                    } else {
                        // retry from the routing server again
                        retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
                                new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
                    }
                }
            });
    }

    private Future<SocketAddress> getOwner(final StreamOp op) {
        if (clusterClient.isPresent()) {
            final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
            getOwnerFromResourcePlacementServer(op, getOwnerPromise);
            return getOwnerPromise;
        }
        // pickup host by hashing
        try {
            return Future.value(routingService.getHost(op.stream, op.routingContext));
        } catch (NoBrokersAvailableException nbae) {
            return Future.exception(nbae);
        }
    }

    private void sendWriteRequest(final SocketAddress addr, final StreamOp op) {
        // Get corresponding finagle client
        final ProxyClient sc = clientManager.getClient(addr);
        final long startTimeNanos = System.nanoTime();
        // write the request to that host.
        op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() {
            @Override
            public void onSuccess(ResponseHeader header) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received response; header: {}", header);
                }
                clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos);
                // update routing context
                op.routingContext.addTriedHost(addr, header.getCode());
                switch (header.getCode()) {
                    case SUCCESS:
                        // success handling is done per stream op
                        break;
                    case FOUND:
                        handleRedirectResponse(header, op, addr);
                        break;
                    // for overcapacity, dont report failure since this normally happens quite a bit
                    case OVER_CAPACITY:
                        logger.debug("Failed to write request to {} : {}", op.stream, header);
                        op.fail(addr, DLException.of(header));
                        break;
                    // for responses that indicate the requests definitely failed,
                    // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION)
                    case NOT_IMPLEMENTED:
                    case METADATA_EXCEPTION:
                    case LOG_EMPTY:
                    case LOG_NOT_FOUND:
                    case TRUNCATED_TRANSACTION:
                    case END_OF_STREAM:
                    case TRANSACTION_OUT_OF_ORDER:
                    case INVALID_STREAM_NAME:
                    case REQUEST_DENIED:
                    case TOO_LARGE_RECORD:
                    case CHECKSUM_FAILED:
                    // status code NOT_READY is returned if failfast is enabled in the server. don't redirect
                    // since the proxy may still own the stream.
                    case STREAM_NOT_READY:
                        op.fail(addr, DLException.of(header));
                        break;
                    case SERVICE_UNAVAILABLE:
                        handleServiceUnavailable(addr, sc, Optional.of(op));
                        break;
                    case REGION_UNAVAILABLE:
                        // region is unavailable, redirect the request to hosts in other region
                        redirect(op, null);
                        break;
                    // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since
                    // we didn't have it in the first place.
                    case TOO_MANY_STREAMS:
                        handleRedirectableError(addr, op, header);
                        break;
                    case STREAM_UNAVAILABLE:
                    case ZOOKEEPER_ERROR:
                    case LOCKING_EXCEPTION:
                    case UNEXPECTED:
                    case INTERRUPTED:
                    case BK_TRANSMIT_ERROR:
                    case FLUSH_TIMEOUT:
                    default:
                        // when we are receiving these exceptions from proxy, it means proxy or the stream is closed
                        // redirect the request.
                        ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name());
                        handleRedirectableError(addr, op, header);
                        break;
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                Optional<StreamOp> opOptional = Optional.of(op);
                cause = showRootCause(opOptional, cause);
                clientStats.failProxyRequest(addr, cause, startTimeNanos);
                handleRequestException(addr, sc, opOptional, cause);
            }
        });
    }

    // Response Handlers

    Throwable showRootCause(Optional<StreamOp> op, Throwable cause) {
        if (cause instanceof Failure) {
            Failure failure = (Failure) cause;
            if (failure.isFlagged(Failure.Wrapped())) {
                try {
                    // if it is a wrapped failure, unwrap it first
                    cause = failure.show();
                } catch (IllegalArgumentException iae) {
                    if (op.isPresent()) {
                        logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae);
                    } else {
                        logger.warn("Failed to unwrap finagle failure : ", iae);
                    }
                }
            }
        }
        return cause;
    }

    private void handleRedirectableError(SocketAddress addr,
                                         StreamOp op,
                                         ResponseHeader header) {
        if (streamFailfast) {
            op.fail(addr, DLException.of(header));
        } else {
            redirect(op, null);
        }
    }

    void handleServiceUnavailable(SocketAddress addr,
                                  ProxyClient sc,
                                  Optional<StreamOp> op) {
        // service is unavailable, remove it out of routing service
        routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now."));
        onServerLeft(addr);
        if (op.isPresent()) {
            ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now.");
            // redirect the request to other host.
            redirect(op.get(), null);
        }
    }

    void handleRequestException(SocketAddress addr,
                                ProxyClient sc,
                                Optional<StreamOp> op,
                                Throwable cause) {
        boolean resendOp = false;
        boolean removeOwnerFromStream = false;
        SocketAddress previousAddr = addr;
        String reason = cause.getMessage();
        if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) {
            routingService.removeHost(addr, cause);
            onServerLeft(addr, sc);
            removeOwnerFromStream = true;
            // redirect the request to other host.
            resendOp = true;
        } else if (cause instanceof ChannelException) {
            // java.net.ConnectException typically means connection is refused remotely
            // no process listening on remote address/port.
            if (cause.getCause() instanceof java.net.ConnectException) {
                routingService.removeHost(addr, cause.getCause());
                onServerLeft(addr);
                reason = cause.getCause().getMessage();
            } else {
                routingService.removeHost(addr, cause);
                reason = cause.getMessage();
            }
            removeOwnerFromStream = true;
            // redirect the request to other host.
            resendOp = true;
        } else if (cause instanceof ServiceTimeoutException) {
            // redirect the request to itself again, which will backoff for a while
            resendOp = true;
            previousAddr = null;
        } else if (cause instanceof WriteException) {
            // redirect the request to other host.
            resendOp = true;
        } else if (cause instanceof ServiceException) {
            // redirect the request to other host.
            clientManager.removeClient(addr, sc);
            resendOp = true;
        } else if (cause instanceof TApplicationException) {
            handleTApplicationException(cause, op, addr, sc);
        } else if (cause instanceof Failure) {
            handleFinagleFailure((Failure) cause, op, addr);
        } else {
            // Default handler
            handleException(cause, op, addr);
        }

        if (op.isPresent()) {
            if (removeOwnerFromStream) {
                ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason);
            }
            if (resendOp) {
                doSend(op.get(), previousAddr);
            }
        }
    }

    /**
     * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null,
     * it would pick up a host from routing service.
     *
     * @param op
     *          stream operation
     * @param newAddr
     *          new proxy address
     */
    void redirect(StreamOp op, SocketAddress newAddr) {
        ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream);
        if (null != newAddr) {
            logger.debug("Redirect request {} to new owner {}.", op, newAddr);
            op.send(newAddr);
        } else {
            doSend(op, null);
        }
    }

    void handleFinagleFailure(Failure failure,
                              Optional<StreamOp> op,
                              SocketAddress addr) {
        if (failure.isFlagged(Failure.Restartable())) {
            if (op.isPresent()) {
                // redirect the request to other host
                doSend(op.get(), addr);
            }
        } else {
            // fail the request if it is other types of failures
            handleException(failure, op, addr);
        }
    }

    void handleException(Throwable cause,
                         Optional<StreamOp> op,
                         SocketAddress addr) {
        // RequestTimeoutException: fail it and let client decide whether to retry or not.

        // FailedFastException:
        // We don't actually know when FailedFastException will be thrown
        // so properly we just throw it back to application to let application
        // handle it.

        // Other Exceptions: as we don't know how to handle them properly so throw them to client
        if (op.isPresent()) {
            logger.error("Failed to write request to {} @ {} : {}",
                    new Object[]{op.get().stream, addr, cause.toString()});
            op.get().fail(addr, cause);
        }
    }

    void handleTApplicationException(Throwable cause,
                                     Optional<StreamOp> op,
                                     SocketAddress addr,
                                     ProxyClient sc) {
        TApplicationException ex = (TApplicationException) cause;
        if (ex.getType() == TApplicationException.UNKNOWN_METHOD) {
            // if we encountered unknown method exception on thrift server, it means this proxy
            // has problem. we should remove it from routing service, clean up ownerships
            routingService.removeHost(addr, cause);
            onServerLeft(addr, sc);
            if (op.isPresent()) {
                ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage());
                doSend(op.get(), addr);
            }
        } else {
            handleException(cause, op, addr);
        }
    }

    void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) {
        SocketAddress ownerAddr = null;
        if (header.isSetLocation()) {
            String owner = header.getLocation();
            try {
                ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress();
                // if we are receiving a direct request to same host, we won't try the same host.
                // as the proxy will shut itself down if it redirects client to itself.
                if (curAddr.equals(ownerAddr)) {
                    logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr);
                    ownerAddr = null;
                } else {
                    // update ownership when redirects.
                    ownershipCache.updateOwner(op.stream, ownerAddr);
                }
            } catch (IOException e) {
                ownerAddr = null;
            }
        }
        redirect(op, ownerAddr);
    }

    void updateOwnership(String stream, String location) {
        try {
            SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress();
            // update ownership
            ownershipCache.updateOwner(stream, ownerAddr);
        } catch (IOException e) {
            logger.warn("Invalid ownership {} found for stream {} : ",
                new Object[] { location, stream, e });
        }
    }

}
