blob: 6711aed924c20bc603a922a362cc60d50d8e67fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest {
@DataProvider(name = "delayedTracker")
public Object[][] provider(Method method) throws Exception {
dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
clock = mock(Clock.class);
clockTime = new AtomicLong();
when(clock.millis()).then(x -> clockTime.get());
final String methodName = method.getName();
return switch (methodName) {
case "test" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, 0)
}};
case "testWithTimer" -> {
Timer timer = mock(Timer.class);
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
TimerTask task = invocation.getArgument(0, TimerTask.class);
long timeout = invocation.getArgument(1, Long.class);
TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
long scheduleAt = clockTime.get() + unit.toMillis(timeout);
tasks.put(scheduleAt, task);
Timeout t = mock(Timeout.class);
when(t.cancel()).then(i -> {
tasks.remove(scheduleAt, task);
return null;
});
return t;
});
yield new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, 0),
tasks
}};
}
case "testAddWithinTickTime" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false, 0)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, 0)
}};
case "testWithFixedDelays", "testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, 100)
}};
default -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, 0)
}};
};
}
@Test(dataProvider = "delayedTracker")
public void testWithFixedDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));
assertFalse(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());
for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
clockTime.set(tracker.getFixedDelayDetectionLookahead() * 10);
tracker.getScheduledMessages(100);
assertFalse(tracker.shouldPauseAllDeliveries());
// Empty the tracker
int removed = 0;
do {
removed = tracker.getScheduledMessages(100).size();
} while (removed > 0);
assertFalse(tracker.shouldPauseAllDeliveries());
tracker.close();
}
@Test(dataProvider = "delayedTracker")
public void testWithMixedDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));
assertFalse(tracker.shouldPauseAllDeliveries());
for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
// Add message with earlier delivery time
assertTrue(tracker.addMessage(5, 6, 5));
assertFalse(tracker.shouldPauseAllDeliveries());
tracker.close();
}
@Test(dataProvider = "delayedTracker")
public void testWithNoDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));
assertFalse(tracker.shouldPauseAllDeliveries());
for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
// Add message with no-delay
assertFalse(tracker.addMessage(5, 6, -1L));
assertFalse(tracker.shouldPauseAllDeliveries());
tracker.close();
}
@Test
public void testClose() throws Exception {
Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
1, TimeUnit.MILLISECONDS);
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
final Exception[] exceptions = new Exception[1];
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, 0) {
@Override
public void run(Timeout timeout) throws Exception {
super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS);
if (timeout == null || timeout.isCancelled()) {
return;
}
try {
this.priorityQueue.peekN1();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
}
}
};
tracker.addMessage(1, 1, 10);
clockTime.set(10);
Thread.sleep(300);
tracker.close();
assertNull(exceptions[0]);
timer.stop();
}
}