blob: 5cc9d1ab0bd200edf944e97f7ad4fbe0814950cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FreeRunningClock;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
// TODO: incomplete
public class OutboundMessageQueueTest
{
@BeforeClass
public static void init()
{
DatabaseDescriptor.daemonInitialization();
}
@Test
public void testRemove() throws InterruptedException
{
final Message<?> m1 = Message.out(Verb._TEST_1, noPayload);
final Message<?> m2 = Message.out(Verb._TEST_1, noPayload);
final Message<?> m3 = Message.out(Verb._TEST_1, noPayload);
final OutboundMessageQueue queue = new OutboundMessageQueue(approxTime, message -> true);
queue.add(m1);
queue.add(m2);
queue.add(m3);
Assert.assertTrue(queue.remove(m1));
Assert.assertFalse(queue.remove(m1));
CountDownLatch locked = new CountDownLatch(1);
CountDownLatch lockUntil = new CountDownLatch(1);
CountDownLatch lockReleased = new CountDownLatch(1);
new Thread(() -> {
try (OutboundMessageQueue.WithLock lock = queue.lockOrCallback(0, () -> {}))
{
locked.countDown();
Uninterruptibles.awaitUninterruptibly(lockUntil);
}
lockReleased.countDown();
}).start();
Uninterruptibles.awaitUninterruptibly(locked);
CountDownLatch start = new CountDownLatch(2);
CountDownLatch finish = new CountDownLatch(2);
new Thread(() -> {
start.countDown();
Assert.assertTrue(queue.remove(m2));
finish.countDown();
}).start();
new Thread(() -> {
start.countDown();
Assert.assertTrue(queue.remove(m3));
finish.countDown();
}).start();
Uninterruptibles.awaitUninterruptibly(start);
lockUntil.countDown();
Uninterruptibles.awaitUninterruptibly(finish);
Uninterruptibles.awaitUninterruptibly(lockReleased);
try (OutboundMessageQueue.WithLock lock = queue.lockOrCallback(0, () -> {}))
{
Assert.assertNull(lock.peek());
}
}
@Test
public void testExpirationOnIteration()
{
FreeRunningClock clock = new FreeRunningClock(approxTime.now());
List<Message> expiredMessages = new LinkedList<>();
long startTime = clock.now();
Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
Message<?> m3;
Message<?> m4;
OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
queue.add(m1);
queue.add(m2);
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
// Do nothing
}
// Check next expiry time is equal to m2, and we haven't expired anything yet:
Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
Assert.assertTrue(expiredMessages.isEmpty());
// Wait for m2 expiry time:
clock.advance(4, TimeUnit.SECONDS);
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
// Add a new message while we're iterating the queue: this will expire later than any existing message.
m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(60));
queue.add(m3);
}
// After expiration runs following the WithLock#close(), check the expiration time is updated to m1 (not m3):
Assert.assertEquals(7, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
// Also, m2 was expired and collected:
Assert.assertEquals(m2, expiredMessages.remove(0));
// Wait for m1 expiry time:
clock.advance(4, TimeUnit.SECONDS);
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
// Add a new message while we're iterating the queue: this will expire sooner than the already existing message.
m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
queue.add(m4);
}
// Check m1 was expired and collected:
Assert.assertEquals(m1, expiredMessages.remove(0));
// Check next expiry time is m4 (not m3):
Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
// Consume all messages before expiration:
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
Assert.assertEquals(m3, l.poll());
Assert.assertEquals(m4, l.poll());
}
// Check next expiry time is still m4 as the deadline hasn't passed yet:
Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
// Go past the deadline:
clock.advance(4, TimeUnit.SECONDS);
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
// Do nothing, just trigger expiration on close
}
// Check nothing is expired:
Assert.assertTrue(expiredMessages.isEmpty());
// Check next expiry time is now Long.MAX_VALUE as nothing was in the queue:
Assert.assertEquals(Long.MAX_VALUE, queue.nextExpirationIn(0, TimeUnit.NANOSECONDS));
}
@Test
public void testExpirationOnAdd()
{
FreeRunningClock clock = new FreeRunningClock(approxTime.now());
List<Message> expiredMessages = new LinkedList<>();
long startTime = clock.now();
OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
queue.add(m1);
queue.add(m2);
// Check next expiry time is equal to m2, and we haven't expired anything yet:
Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
Assert.assertTrue(expiredMessages.isEmpty());
// Go past m1 expiry time:
clock.advance(8, TimeUnit.SECONDS);
// Add a new message and verify both m1 and m2 have been expired:
Message<?> m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
queue.add(m3);
Assert.assertEquals(m2, expiredMessages.remove(0));
Assert.assertEquals(m1, expiredMessages.remove(0));
// New expiration deadline is m3:
Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
// Go past m3 expiry time:
clock.advance(4, TimeUnit.SECONDS);
try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
{
// Add a new message and verify nothing is expired because the lock is held by this iteration:
Message<?> m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(15));
queue.add(m4);
Assert.assertTrue(expiredMessages.isEmpty());
// Also the deadline didn't change, even though we're past the m3 expiry time: this way we're sure the
// pruner will run promptly even if falling behind during iteration.
Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
}
// Check post iteration m3 has expired:
Assert.assertEquals(m3, expiredMessages.remove(0));
// And deadline is now m4:
Assert.assertEquals(15, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
}
}