blob: 54d1e70ad7ee225a86471f41f70839f446ccab1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
import org.jboss.netty.channel.Channel;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.lmax.disruptor.BlockingWaitStrategy;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestReplyProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
private static final long ANY_DISRUPTOR_SEQUENCE = 1234L;
public static final int BATCH_POOL_SIZE = 3;
private static final long FIRST_ST = 0L;
private static final long FIRST_CT = 1L;
private static final long SECOND_ST = 2L;
private static final long SECOND_CT = 3L;
private static final long THIRD_ST = 4L;
private static final long THIRD_CT = 5L;
private static final long FOURTH_ST = 6L;
private static final long FOURTH_CT = 7L;
private static final long FIFTH_ST = 8L;
private static final long FIFTH_CT = 9L;
private static final long SIXTH_ST = 10L;
@Mock
private Panicker panicker;
@Mock
private MonitoringContextImpl monCtx;
private MetricsRegistry metrics;
private ObjectPool<Batch> batchPool;
// Component under test
private ReplyProcessorImpl replyProcessor;
@BeforeMethod(alwaysRun = true, timeOut = 30_000)
public void initMocksAndComponents() throws Exception {
MockitoAnnotations.initMocks(this);
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE);
// Configure null metrics provider
metrics = new NullMetricsProvider();
batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool));
}
@AfterMethod
void afterMethod() {
}
@Test(timeOut = 10_000)
public void testBadFormedPackageThrowsException() throws Exception {
// We need an instance throwing exceptions for this test
replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool));
// Prepare test batch
Batch batch = batchPool.borrowObject();
batch.addCommitRetry(FIRST_ST, null, monCtx);
ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(e, batch, 0);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
try {
replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
fail();
} catch (RuntimeException re) {
// Expected
}
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
}
@Test(timeOut = 10_000)
public void testUnorderedBatchSequenceGetsSaved() throws Exception {
final long HIGH_SEQUENCE_NUMBER = 1234L; // Should be greater than 0
// Prepare test batch
Batch batch = batchPool.borrowObject();
ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(e, batch, HIGH_SEQUENCE_NUMBER);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 1);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
assertTrue(batch.isEmpty());
verify(replyProcessor, times(0)).handleReplyBatchEvent(any(ReplyBatchEvent.class));
}
@Test(timeOut = 10_000)
public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
// Prepare test batch
Batch batch = batchPool.borrowObject();
ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(e, batch, 0);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
assertEquals(replyProcessor.nextIDToHandle.get(), 1);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 0);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
assertTrue(batch.isEmpty());
verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(e));
}
@Test(timeOut = 10_000)
public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
// Prepare 3 batches with events and simulate a different order of arrival using the batch sequence
// Prepare first a delayed batch (Batch #3)
Batch thirdBatch = batchPool.borrowObject();
thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx);
ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 1);
assertEquals(batchPool.getNumActive(), 1);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
assertFalse(thirdBatch.isEmpty());
verify(replyProcessor, never()).handleReplyBatchEvent(eq(thirdBatchEvent));
// Prepare another delayed batch (Batch #2)
Batch secondBatch = batchPool.borrowObject();
secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), monCtx);
secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx);
ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
replyProcessor.onEvent(secondBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
assertEquals(replyProcessor.nextIDToHandle.get(), 0);
assertEquals(replyProcessor.futureEvents.size(), 2);
assertEquals(batchPool.getNumActive(), 2);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 2);
assertFalse(secondBatch.isEmpty());
assertFalse(thirdBatch.isEmpty());
// Finally, prepare the batch that should trigger the execution of the other two
Batch firstBatch = batchPool.borrowObject();
firstBatch.addAbort(FIFTH_ST, mock(Channel.class), monCtx);
ReplyBatchEvent firstBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(firstBatchEvent, firstBatch, 0); // Set the first batch with a higher sequence
replyProcessor.onEvent(firstBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
assertEquals(replyProcessor.nextIDToHandle.get(), 3);
assertEquals(replyProcessor.futureEvents.size(), 0);
assertEquals(batchPool.getNumActive(), 0);
assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
assertTrue(firstBatch.isEmpty());
assertTrue(secondBatch.isEmpty());
assertTrue(thirdBatch.isEmpty());
// Check the method calls have been properly ordered
InOrder inOrderReplyBatchEvents = inOrder(replyProcessor, replyProcessor, replyProcessor);
inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(firstBatchEvent));
inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(secondBatchEvent));
inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
}
}