/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.service;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.base.Optional;
import org.apache.distributedlog.AsyncLogReader;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogManager;
import org.apache.distributedlog.LogReader;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.client.routing.LocalRoutingService;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.stream.StreamManagerImpl;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.distributedlog.thrift.service.BulkWriteResponse;
import org.apache.distributedlog.thrift.service.HeartbeatOptions;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteContext;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.FutureUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.Futures;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test Case for {@link DistributedLogServer}.
 */
public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {

    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);

    @Rule
    public TestName testName = new TestName();

    protected TestDistributedLogServerBase(boolean clientSideRouting) {
        super(clientSideRouting);
    }

    /**
     * {@link https://issues.apache.org/jira/browse/DL-27}.
     */
    @DistributedLogAnnotations.FlakyTest
    @Ignore
    @Test(timeout = 60000)
    public void testBasicWrite() throws Exception {
        String name = "dlserver-basic-write";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        for (long i = 1; i <= 10; i++) {
            logger.debug("Write entry {} to stream {}.", i, name);
            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
        }

        HeartbeatOptions hbOptions = new HeartbeatOptions();
        hbOptions.setSendHeartBeatToReader(true);
        // make sure the first log segment of each stream created
        FutureUtils.result(dlClient.dlClient.heartbeat(name));

        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader = dlm.getInputStream(1);
        int numRead = 0;
        LogRecord r = reader.readNext(false);
        while (null != r) {
            ++numRead;
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(numRead, i);
            r = reader.readNext(false);
        }
        assertEquals(10, numRead);
        reader.close();
        dlm.close();
    }

    /**
     * Sanity check to make sure both checksum flag values work.
     */
    @Test(timeout = 60000)
    public void testChecksumFlag() throws Exception {
        String name = "testChecksumFlag";
        LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
        routingService.addHost(name, dlServer.getAddress());
        DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
            .name(name)
            .clientId(ClientId$.MODULE$.apply("test"))
            .routingService(routingService)
            .handshakeWithClientInfo(true)
            .clientBuilder(ClientBuilder.get()
                .hostConnectionLimit(1)
                .connectionTimeout(Duration.fromSeconds(1))
                .requestTimeout(Duration.fromSeconds(60)))
            .checksum(false);
        DistributedLogClient dlClient = dlClientBuilder.build();
        Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
        dlClient.close();

        dlClient = dlClientBuilder.checksum(true).build();
        Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
        dlClient.close();
    }

    private void runSimpleBulkWriteTest(int writeCount) throws Exception {
        String name = String.format("dlserver-bulk-write-%d", writeCount);

        dlClient.routingService.addHost(name, dlServer.getAddress());

        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
        for (long i = 1; i <= writeCount; i++) {
            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
        }

        logger.debug("Write {} entries to stream {}.", writeCount, name);
        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
        assertEquals(futures.size(), writeCount);
        for (Future<DLSN> future : futures) {
            // No throw == pass.
            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
        }

        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader = dlm.getInputStream(1);
        int numRead = 0;
        LogRecord r = reader.readNext(false);
        while (null != r) {
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(numRead + 1, i);
            ++numRead;
            r = reader.readNext(false);
        }
        assertEquals(writeCount, numRead);
        reader.close();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testBulkWrite() throws Exception {
        runSimpleBulkWriteTest(100);
    }

    @Test(timeout = 60000)
    public void testBulkWriteSingleWrite() throws Exception {
        runSimpleBulkWriteTest(1);
    }

    @Test(timeout = 60000)
    public void testBulkWriteEmptyList() throws Exception {
        String name = String.format("dlserver-bulk-write-%d", 0);

        dlClient.routingService.addHost(name, dlServer.getAddress());

        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);

        assertEquals(0, futures.size());
    }

    @Test(timeout = 60000)
    public void testBulkWriteNullArg() throws Exception {

        String name = String.format("dlserver-bulk-write-%s", "null");

        dlClient.routingService.addHost(name, dlServer.getAddress());

        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
        writes.add(null);

        try {
            dlClient.dlClient.writeBulk(name, writes);
            fail("should not have succeeded");
        } catch (NullPointerException npe) {
            // expected
            logger.info("Expected to catch NullPointException.");
        }
    }

    @Test(timeout = 60000)
    public void testBulkWriteEmptyBuffer() throws Exception {
        String name = String.format("dlserver-bulk-write-%s", "empty");

        dlClient.routingService.addHost(name, dlServer.getAddress());

        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
        writes.add(ByteBuffer.wrap(("").getBytes()));
        writes.add(ByteBuffer.wrap(("").getBytes()));
        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
        assertEquals(2, futures.size());
        for (Future<DLSN> future : futures) {
            // No throw == pass
            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
        }
    }

    void failDueToWrongException(Exception ex) {
        logger.info("testBulkWritePartialFailure: ", ex);
        fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
    }

    int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
        int failed = 0;
        for (int i = start; i < finish; i++) {
            Future<DLSN> future = futures.get(i);
            try {
                Await.result(future, Duration.fromSeconds(10));
                fail("future should have failed!");
            } catch (DLException cre) {
                ++failed;
            } catch (Exception ex) {
                failDueToWrongException(ex);
            }
        }
        return failed;
    }

    void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
        try {
            Await.result(future, Duration.fromSeconds(10));
            fail("should have failed");
        } catch (DLException dle) {
            assertEquals(StatusCode.TOO_LARGE_RECORD.getValue(), dle.getCode());
        } catch (Exception ex) {
            failDueToWrongException(ex);
        }
    }

    @Test(timeout = 60000)
    public void testBulkWritePartialFailure() throws Exception {
        String name = String.format("dlserver-bulk-write-%s", "partial-failure");

        dlClient.routingService.addHost(name, dlServer.getAddress());

        final int writeCount = 100;

        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
        for (long i = 1; i <= writeCount; i++) {
            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
        }
        // Too big, will cause partial failure.
        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
        writes.add(buf);
        for (long i = 1; i <= writeCount; i++) {
            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
        }

        // Count succeeded.
        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
        int succeeded = 0;
        for (int i = 0; i < writeCount; i++) {
            Future<DLSN> future = futures.get(i);
            try {
                Await.result(future, Duration.fromSeconds(10));
                ++succeeded;
            } catch (Exception ex) {
                failDueToWrongException(ex);
            }
        }

        validateFailedAsLogRecordTooLong(futures.get(writeCount));
        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
        assertEquals(writeCount, succeeded);
    }

    @Test(timeout = 60000)
    public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
        String name = String.format("dlserver-bulk-write-%s", "first-write-failed");

        dlClient.routingService.addHost(name, dlServer.getAddress());

        final int writeCount = 100;
        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
        writes.add(buf);
        for (long i = 1; i <= writeCount; i++) {
            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
        }

        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
        validateFailedAsLogRecordTooLong(futures.get(0));
        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
    }

    @Test(timeout = 60000)
    public void testBulkWriteTotalFailureLostLock() throws Exception {
        String name = String.format("dlserver-bulk-write-%s", "lost-lock");

        dlClient.routingService.addHost(name, dlServer.getAddress());

        final int writeCount = 8;
        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
        ByteBuffer buf = ByteBuffer.allocate(8);
        writes.add(buf);
        for (long i = 1; i <= writeCount; i++) {
            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
        }
        // Warm it up with a write.
        Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));

        // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
        DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
        try {
            FailpointUtils.setFailpoint(
                FailpointUtils.FailPointName.FP_WriteInternalLostLock,
                FailpointUtils.FailPointActions.FailPointAction_Default
            );
            Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
            assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
        } finally {
            FailpointUtils.removeFailpoint(
                FailpointUtils.FailPointName.FP_WriteInternalLostLock
            );
        }
    }

    @Test(timeout = 60000)
    public void testHeartbeat() throws Exception {
        String name = "dlserver-heartbeat";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        for (long i = 1; i <= 10; i++) {
            logger.debug("Send heartbeat {} to stream {}.", i, name);
            dlClient.dlClient.check(name).get();
        }

        logger.debug("Write entry one to stream {}.", name);
        dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();

        Thread.sleep(1000);

        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
        int numRead = 0;
        // eid=0 => control records
        // other 9 heartbeats will not trigger writing any control records.
        // eid=1 => user entry
        long startEntryId = 1;
        LogRecordWithDLSN r = reader.readNext(false);
        while (null != r) {
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(numRead + 1, i);
            assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
            ++numRead;
            ++startEntryId;
            r = reader.readNext(false);
        }
        assertEquals(1, numRead);
    }

    @Test(timeout = 60000)
    public void testFenceWrite() throws Exception {
        String name = "dlserver-fence-write";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        for (long i = 1; i <= 10; i++) {
            logger.debug("Write entry {} to stream {}.", i, name);
            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
        }

        Thread.sleep(1000);

        logger.info("Fencing stream {}.", name);
        DLMTestUtil.fenceStream(conf, getUri(), name);
        logger.info("Fenced stream {}.", name);

        for (long i = 11; i <= 20; i++) {
            logger.debug("Write entry {} to stream {}.", i, name);
            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
        }

        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader = dlm.getInputStream(1);
        int numRead = 0;
        LogRecord r = reader.readNext(false);
        while (null != r) {
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(numRead + 1, i);
            ++numRead;
            r = reader.readNext(false);
        }
        assertEquals(20, numRead);
        reader.close();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testDeleteStream() throws Exception {
        String name = "dlserver-delete-stream";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        long txid = 101;
        for (long i = 1; i <= 10; i++) {
            long curTxId = txid++;
            logger.debug("Write entry {} to stream {}.", curTxId, name);
            dlClient.dlClient.write(name,
                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
        }

        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);

        dlClient.dlClient.delete(name).get();

        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);

        Thread.sleep(1000);

        DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
        AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
        try {
            FutureUtils.result(reader101.readNext());
            fail("Should fail with LogNotFoundException since the stream is deleted");
        } catch (LogNotFoundException lnfe) {
            // expected
        }
        FutureUtils.result(reader101.asyncClose());
        dlm101.close();

        txid = 201;
        for (long i = 1; i <= 10; i++) {
            long curTxId = txid++;
            logger.debug("Write entry {} to stream {}.", curTxId, name);
            DLSN dlsn = dlClient.dlClient.write(name,
                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
        }
        Thread.sleep(1000);

        DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader201 = dlm201.getInputStream(1);
        int numRead = 0;
        int curTxId = 201;
        LogRecord r = reader201.readNext(false);
        while (null != r) {
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(curTxId++, i);
            ++numRead;
            r = reader201.readNext(false);
        }
        assertEquals(10, numRead);
        reader201.close();
        dlm201.close();
    }

    @Test(timeout = 60000)
    public void testCreateStream() throws Exception {
        try {
            setupNoAdHocCluster();
            final String name = "dlserver-create-stream";

            noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());

            long txid = 101;
            for (long i = 1; i <= 10; i++) {
                long curTxId = txid++;
                logger.debug("Write entry {} to stream {}.", curTxId, name);
                noAdHocClient.dlClient.write(name,
                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
            }

            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
        } finally {
            tearDownNoAdHocCluster();
        }
    }

    /**
     * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
     */
    @Test(timeout = 60000)
    public void testCreateStreamTwice() throws Exception {
        try {
            setupNoAdHocCluster();
            final String name = "dlserver-create-stream-twice";

            noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());

            long txid = 101;
            for (long i = 1; i <= 10; i++) {
                long curTxId = txid++;
                logger.debug("Write entry {} to stream {}.", curTxId, name);
                noAdHocClient.dlClient.write(name,
                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
            }

            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));

            // create again
            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
        } finally {
            tearDownNoAdHocCluster();
        }
    }



    @Test(timeout = 60000)
    public void testTruncateStream() throws Exception {
        String name = "dlserver-truncate-stream";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        long txid = 1;
        Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
        for (int s = 1; s <= 2; s++) {
            for (long i = 1; i <= 10; i++) {
                long curTxId = txid++;
                logger.debug("Write entry {} to stream {}.", curTxId, name);
                DLSN dlsn = dlClient.dlClient.write(name,
                        ByteBuffer.wrap(("" + curTxId).getBytes())).get();
                txid2DLSN.put(curTxId, dlsn);
            }
            if (s == 1) {
                dlClient.dlClient.release(name).get();
            }
        }

        DLSN dlsnToDelete = txid2DLSN.get(11L);
        dlClient.dlClient.truncate(name, dlsnToDelete).get();

        DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
        LogReader reader = readDLM.getInputStream(1);
        int numRead = 0;
        int curTxId = 11;
        LogRecord r = reader.readNext(false);
        while (null != r) {
            int i = Integer.parseInt(new String(r.getPayload()));
            assertEquals(curTxId++, i);
            ++numRead;
            r = reader.readNext(false);
        }
        assertEquals(10, numRead);
        reader.close();
        readDLM.close();
    }

    @Test(timeout = 60000)
    public void testRequestDenied() throws Exception {
        String name = "request-denied";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        AccessControlEntry ace = new AccessControlEntry();
        ace.setDenyWrite(true);
        ZooKeeperClient zkc = TestZooKeeperClientBuilder
                .newBuilder()
                .uri(getUri())
                .connectionTimeoutMs(60000)
                .sessionTimeoutMs(60000)
                .build();
        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
        String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
        ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
        accessControl.create(zkc);

        AccessControlManager acm = dlNamespace.createAccessControlManager();
        while (acm.allowWrite(name)) {
            Thread.sleep(100);
        }

        try {
            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
            fail("Should fail with request denied exception");
        } catch (DLException dle) {
            assertEquals(StatusCode.REQUEST_DENIED.getValue(), dle.getCode());
        }
    }

    @Test(timeout = 60000)
    public void testNoneStreamNameRegex() throws Exception {
        String streamNamePrefix = "none-stream-name-regex-";
        int numStreams = 5;
        Set<String> streams = new HashSet<String>();

        for (int i = 0; i < numStreams; i++) {
            streams.add(streamNamePrefix + i);
        }
        testStreamNameRegex(streams, ".*", streams);
    }

    @Test(timeout = 60000)
    public void testStreamNameRegex() throws Exception {
        String streamNamePrefix = "stream-name-regex-";
        int numStreams = 5;
        Set<String> streams = new HashSet<String>();
        Set<String> expectedStreams = new HashSet<String>();
        String streamNameRegex = streamNamePrefix + "1";

        for (int i = 0; i < numStreams; i++) {
            streams.add(streamNamePrefix + i);
        }
        expectedStreams.add(streamNamePrefix + "1");

        testStreamNameRegex(streams, streamNameRegex, expectedStreams);
    }

    private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
                                     Set<String> expectedStreams)
            throws Exception {
        for (String streamName : streams) {
            dlClient.routingService.addHost(streamName, dlServer.getAddress());
            Await.result(dlClient.dlClient.write(streamName,
                    ByteBuffer.wrap(streamName.getBytes(UTF_8))));
        }

        DLClient client = createDistributedLogClient(
                "test-stream-name-regex",
                streamNameRegex,
                Optional.<String>absent());
        try {
            client.routingService.addHost("unknown", dlServer.getAddress());
            client.handshake();
            Map<SocketAddress, Set<String>> distribution =
                    client.dlClient.getStreamOwnershipDistribution();
            assertEquals(1, distribution.size());
            Set<String> cachedStreams = distribution.values().iterator().next();
            assertNotNull(cachedStreams);
            assertEquals(expectedStreams.size(), cachedStreams.size());

            for (String streamName : cachedStreams) {
                assertTrue(expectedStreams.contains(streamName));
            }
        } finally {
            client.shutdown();
        }
    }

    @Test(timeout = 60000)
    public void testReleaseStream() throws Exception {
        String name = "dlserver-release-stream";

        dlClient.routingService.addHost(name, dlServer.getAddress());

        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);

        // release the stream
        Await.result(dlClient.dlClient.release(name));
        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
    }

    protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
                             String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
        assertEquals(expectedNumProxiesInClient, distribution.size());

        if (expectedNumProxiesInClient > 0) {
            Map.Entry<SocketAddress, Set<String>> localEntry =
                    distribution.entrySet().iterator().next();
            assertEquals(owner, localEntry.getKey());
            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
            assertEquals(existedInClient, localEntry.getValue().contains(name));
        }


        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();

        assertEquals(expectedServerCacheSize, cachedStreams.size());
        assertEquals(existedInServer, cachedStreams.contains(name));
        assertEquals(expectedServerCacheSize, acquiredStreams.size());
        assertEquals(existedInServer, acquiredStreams.contains(name));
    }

}
