blob: 14adb610318c7aa1c8ef6b7582d922e9a7addcb4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.appdata.query;
import java.util.Random;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.appdata.query.serde.MessageSerializerFactory;
import org.apache.apex.malhar.lib.appdata.schemas.ResultFormatter;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import org.apache.apex.malhar.lib.util.TestUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import com.datatorrent.api.DefaultOutputPort;
public class QueryManagerAsynchronousTest
{
@Rule
public TestWatcher testMeta = new InterruptClear();
public static class InterruptClear extends TestWatcher
{
@Override
protected void starting(Description description)
{
Thread.interrupted();
}
@Override
protected void finished(Description description)
{
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
//noop
}
Thread.interrupted();
}
}
@Test
public void stressTest() throws Exception
{
final int totalTuples = 100000;
final int batchSize = 100;
final double waitMillisProb = .01;
AppDataWindowEndQueueManager<MockQuery, Void> queueManager = new AppDataWindowEndQueueManager<MockQuery, Void>();
DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
CollectorTestSink<MockResult> sink = new CollectorTestSink<MockResult>();
TestUtils.setSink(outputPort, sink);
MessageSerializerFactory msf = new MessageSerializerFactory(new ResultFormatter());
QueryManagerAsynchronous<MockQuery, Void, MutableLong, MockResult> queryManagerAsynch =
new QueryManagerAsynchronous<>(outputPort, queueManager, new NOPQueryExecutor(waitMillisProb), msf,
Thread.currentThread());
Thread producerThread = new Thread(new ProducerThread(queueManager, totalTuples, batchSize, waitMillisProb));
producerThread.start();
producerThread.setName("Producer Thread");
long startTime = System.currentTimeMillis();
queryManagerAsynch.setup(null);
int numWindows = 0;
for (; sink.collectedTuples.size() < totalTuples && ((System.currentTimeMillis() - startTime) < 60000);
numWindows++) {
queryManagerAsynch.beginWindow(numWindows);
Thread.sleep(100);
queryManagerAsynch.endWindow();
}
producerThread.stop();
queryManagerAsynch.teardown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do Nothing
}
Assert.assertEquals(totalTuples, sink.collectedTuples.size());
}
public static class NOPQueryExecutor implements QueryExecutor<MockQuery, Void, MutableLong, MockResult>
{
private final double waitMillisProb;
private final Random rand = new Random();
public NOPQueryExecutor(double waitMillisProb)
{
this.waitMillisProb = waitMillisProb;
}
@Override
public MockResult executeQuery(MockQuery query, Void metaQuery, MutableLong queueContext)
{
if (rand.nextDouble() < waitMillisProb) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return new MockResult(query);
}
}
public static class ProducerThread implements Runnable
{
private final int totalTuples;
private final int batchSize;
private final AppDataWindowEndQueueManager<MockQuery, Void> queueManager;
private final double waitMillisProb;
private final Random rand = new Random();
public ProducerThread(AppDataWindowEndQueueManager<MockQuery, Void> queueManager, int totalTuples, int batchSize,
double waitMillisProb)
{
this.queueManager = queueManager;
this.totalTuples = totalTuples;
this.batchSize = batchSize;
this.waitMillisProb = waitMillisProb;
}
@Override
public void run()
{
int numLoops = totalTuples / batchSize;
for (int loopCounter = 0, tupleCounter = 0; loopCounter < numLoops; loopCounter++, tupleCounter++) {
for (int batchCounter = 0; batchCounter < batchSize; batchCounter++, tupleCounter++) {
queueManager.enqueue(new MockQuery(tupleCounter + ""), null, new MutableLong(1L));
if (rand.nextDouble() < waitMillisProb) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(QueryManagerAsynchronousTest.class);
}