/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.util.PermitLimiter;
import org.apache.distributedlog.util.SimplePermitLimiter;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import scala.runtime.BoxedUnit;

public class TestWriteLimiter {
    static final Logger LOG = LoggerFactory.getLogger(TestWriteLimiter.class);

    SimplePermitLimiter createPermitLimiter(boolean darkmode, int permits) {
        return createPermitLimiter(darkmode, permits, new SettableFeature("", 0));
    }

    SimplePermitLimiter createPermitLimiter(boolean darkmode, int permits, Feature feature) {
        return new SimplePermitLimiter(darkmode, permits, new NullStatsLogger(), false, feature);
    }

    @Test(timeout = 60000)
    public void testGlobalOnly() throws Exception {
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, Integer.MAX_VALUE);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, 1);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        try {
            limiter.acquire();
            fail("should have thrown global limit exception");
        } catch (OverCapacityException ex) {
        }
        assertPermits(streamLimiter, 1, globalLimiter, 1);
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testStreamOnly() throws Exception {
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, 1);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, Integer.MAX_VALUE);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        try {
            limiter.acquire();
            fail("should have thrown stream limit exception");
        } catch (OverCapacityException ex) {
        }
        assertPermits(streamLimiter, 1, globalLimiter, 1);
    }

    @Test(timeout = 60000)
    public void testDarkmode() throws Exception {
        SimplePermitLimiter streamLimiter = createPermitLimiter(true, Integer.MAX_VALUE);
        SimplePermitLimiter globalLimiter = createPermitLimiter(true, 1);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
    }

    @Test(timeout = 60000)
    public void testDarkmodeWithDisabledFeature() throws Exception {
        SettableFeature feature = new SettableFeature("test", 10000);
        SimplePermitLimiter streamLimiter = createPermitLimiter(true, 1, feature);
        SimplePermitLimiter globalLimiter = createPermitLimiter(true, Integer.MAX_VALUE, feature);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
        limiter.release();
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testDisabledFeature() throws Exception {
        // Disable darkmode, but should still ignore limits because of the feature.
        SettableFeature feature = new SettableFeature("test", 10000);
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, 1, feature);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, Integer.MAX_VALUE, feature);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
        limiter.release();
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testSetDisableFeatureAfterAcquireAndBeforeRelease() throws Exception {
        SettableFeature feature = new SettableFeature("test", 0);
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, 2, feature);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, Integer.MAX_VALUE, feature);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
        feature.set(10000);
        limiter.release();
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testUnsetDisableFeatureAfterPermitsExceeded() throws Exception {
        SettableFeature feature = new SettableFeature("test", 10000);
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, 1, feature);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, Integer.MAX_VALUE, feature);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 4, globalLimiter, 4);
        feature.set(0);
        limiter.release();
        assertPermits(streamLimiter, 3, globalLimiter, 3);
        try {
            limiter.acquire();
            fail("should have thrown stream limit exception");
        } catch (OverCapacityException ex) {
        }
        assertPermits(streamLimiter, 3, globalLimiter, 3);
        limiter.release();
        limiter.release();
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testUnsetDisableFeatureBeforePermitsExceeded() throws Exception {
        SettableFeature feature = new SettableFeature("test", 0);
        SimplePermitLimiter streamLimiter = createPermitLimiter(false, 1, feature);
        SimplePermitLimiter globalLimiter = createPermitLimiter(false, Integer.MAX_VALUE, feature);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        try {
            limiter.acquire();
            fail("should have thrown stream limit exception");
        } catch (OverCapacityException ex) {
        }
        assertPermits(streamLimiter, 1, globalLimiter, 1);
        feature.set(10000);
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
    }

    @Test(timeout = 60000)
    public void testDarkmodeGlobalUnderStreamOver() throws Exception {
        SimplePermitLimiter streamLimiter = createPermitLimiter(true, 1);
        SimplePermitLimiter globalLimiter = createPermitLimiter(true, 2);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
        limiter.release();
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    @Test(timeout = 60000)
    public void testDarkmodeGlobalOverStreamUnder() throws Exception {
        SimplePermitLimiter streamLimiter = createPermitLimiter(true, 2);
        SimplePermitLimiter globalLimiter = createPermitLimiter(true, 1);
        WriteLimiter limiter = new WriteLimiter("test", streamLimiter, globalLimiter);
        limiter.acquire();
        limiter.acquire();
        assertPermits(streamLimiter, 2, globalLimiter, 2);
        limiter.release();
        assertPermits(streamLimiter, 1, globalLimiter, 1);
        limiter.release();
        assertPermits(streamLimiter, 0, globalLimiter, 0);
    }

    void assertPermits(SimplePermitLimiter streamLimiter, int streamPermits, SimplePermitLimiter globalLimiter, int globalPermits) {
        assertEquals(streamPermits, streamLimiter.getPermits());
        assertEquals(globalPermits, globalLimiter.getPermits());
    }
}
