blob: 778f3f8d75aba330b92093c045dbd2e0d703bc4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Collections
import java.util.concurrent.{DelayQueue, TimeUnit}
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.common.utils.MockTime
import org.junit.{Assert, Before, Test}
class ThrottledResponseExpirationTest {
private val time = new MockTime
private var numCallbacks: Int = 0
private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
Collections.emptyList(),
time)
def callback(delayTimeMs: Int) {
numCallbacks += 1
}
@Before
def beforeMethod() {
numCallbacks = 0
}
@Test
def testExpire() {
val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer", time)
val delayQueue = new DelayQueue[ThrottledResponse]()
val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)
try {
// Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
delayQueue.add(new ThrottledResponse(time, 10, callback))
delayQueue.add(new ThrottledResponse(time, 30, callback))
delayQueue.add(new ThrottledResponse(time, 30, callback))
delayQueue.add(new ThrottledResponse(time, 20, callback))
for(itr <- 1 to 3) {
time.sleep(10)
reaper.doWork()
Assert.assertEquals(itr, numCallbacks)
}
reaper.doWork()
Assert.assertEquals(4, numCallbacks)
Assert.assertEquals(0, delayQueue.size())
reaper.doWork()
Assert.assertEquals(4, numCallbacks)
} finally {
clientMetrics.shutdown()
}
}
@Test
def testThrottledRequest() {
val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback)
val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback)
val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback)
Assert.assertEquals(10, t1.throttleTimeMs)
Assert.assertEquals(20, t2.throttleTimeMs)
Assert.assertEquals(20, t3.throttleTimeMs)
for(itr <- 0 to 2) {
Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS))
Assert.assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS))
Assert.assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS))
time.sleep(10)
}
}
}