blob: d7b304d8a0c24281aee1dc9bf6c39c601fdd8053 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class InMemoryDeliveryTrackerTest {
@Test
public void test() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
Timer timer = mock(Timer.class);
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(5, 5, 50));
assertTrue(tracker.addMessage(4, 4, 40));
assertFalse(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
// Move time forward
clockTime.set(15);
// Message is rejected by tracker since it's already ready to send
assertFalse(tracker.addMessage(6, 6, 10));
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduled = tracker.getScheduledMessages(10);
assertEquals(scheduled.size(), 1);
// Move time forward
clockTime.set(60);
assertEquals(tracker.getNumberOfDelayedMessages(), 4);
assertTrue(tracker.hasMessageAvailable());
scheduled = tracker.getScheduledMessages(1);
assertEquals(scheduled.size(), 1);
assertEquals(tracker.getNumberOfDelayedMessages(), 3);
assertTrue(tracker.hasMessageAvailable());
scheduled = tracker.getScheduledMessages(3);
assertEquals(scheduled.size(), 3);
assertEquals(tracker.getNumberOfDelayedMessages(), 0);
assertFalse(tracker.hasMessageAvailable());
assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
}
@Test
public void testWithTimer() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
Timer timer = mock(Timer.class);
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
TimerTask task = invocation.getArgument(0, TimerTask.class);
long timeout = invocation.getArgument(1, Long.class);
TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
long scheduleAt = clockTime.get() + unit.toMillis(timeout);
tasks.put(scheduleAt, task);
Timeout t = mock(Timeout.class);
when(t.cancel()).then(i -> {
tasks.remove(scheduleAt, task);
return null;
});
return t;
});
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
assertTrue(tasks.isEmpty());
assertTrue(tracker.addMessage(2, 2, 20));
assertEquals(tasks.size(), 1);
assertEquals(tasks.firstKey().longValue(), 20);
assertTrue(tracker.addMessage(1, 1, 10));
assertEquals(tasks.size(), 1);
assertEquals(tasks.firstKey().longValue(), 10);
assertTrue(tracker.addMessage(3, 3, 30));
assertEquals(tasks.size(), 1);
assertEquals(tasks.firstKey().longValue(), 10);
clockTime.set(15);
TimerTask task = tasks.pollFirstEntry().getValue();
Timeout cancelledTimeout = mock(Timeout.class);
when(cancelledTimeout.isCancelled()).thenReturn(true);
task.run(cancelledTimeout);
verifyZeroInteractions(dispatcher);
task.run(mock(Timeout.class));
verify(dispatcher).readMoreEntries();
}
/**
* Adding a message that is about to expire within the tick time should lead
* to a rejection from the tracker.
*/
@Test
public void testAddWithinTickTime() {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
Timer timer = mock(Timer.class);
AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);
clockTime.set(0);
assertFalse(tracker.addMessage(1, 1, 10));
assertFalse(tracker.addMessage(2, 2, 99));
assertTrue(tracker.addMessage(3, 3, 100));
assertTrue(tracker.addMessage(4, 4, 200));
assertEquals(tracker.getNumberOfDelayedMessages(), 2);
}
}