| /* |
| * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.lib.algo; |
| |
| /** |
| * |
| * Functional tests for {@link com.datatorrent.lib.algo.AbstractStreamPatternMatcher}<p> |
| * |
| */ |
| |
| import java.util.Random; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.datatorrent.lib.testbench.CollectorTestSink; |
| import com.datatorrent.api.DefaultOutputPort; |
| |
| |
| |
| public class AbstractStreamPatternMatcherTest |
| { |
| |
| public static class StreamPatternMatcher<T> extends AbstractStreamPatternMatcher<T> |
| { |
| @Override |
| public void processPatternFound() |
| { |
| outputPort.emit(getPattern().getStates()); |
| } |
| |
| public transient DefaultOutputPort<T[]> outputPort = new DefaultOutputPort<T[]>(); |
| } |
| |
| private StreamPatternMatcher<Integer> streamPatternMatcher; |
| private AbstractStreamPatternMatcher.Pattern<Integer> pattern; |
| private Integer[] inputPattern; |
| private CollectorTestSink<Object> sink; |
| |
| @Before |
| public void setup() |
| { |
| streamPatternMatcher = new StreamPatternMatcher<Integer>(); |
| sink = new CollectorTestSink<Object>(); |
| streamPatternMatcher.outputPort.setSink(sink); |
| } |
| |
| @After |
| public void cleanup() |
| { |
| streamPatternMatcher.teardown(); |
| sink.collectedTuples.clear(); |
| } |
| |
| @Test |
| public void test() throws Exception |
| { |
| inputPattern = new Integer[]{0, 1, 0, 1, 2}; |
| pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); |
| streamPatternMatcher.setPattern(pattern); |
| streamPatternMatcher.setup(null); |
| streamPatternMatcher.beginWindow(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(2); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.endWindow(); |
| Assert.assertEquals("The number of tuples emitted is one", 1, sink.collectedTuples.size()); |
| Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, sink.collectedTuples.get(0)); |
| } |
| |
| @Test |
| public void testSimplePattern() throws Exception |
| { |
| inputPattern = new Integer[]{0, 0}; |
| pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); |
| streamPatternMatcher.setPattern(pattern); |
| streamPatternMatcher.setup(null); |
| streamPatternMatcher.beginWindow(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.endWindow(); |
| Assert.assertEquals("The number of tuples emitted are three", 3, sink.collectedTuples.size()); |
| for (Object object : sink.collectedTuples) { |
| Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object); |
| } |
| } |
| |
| @Test |
| public void testPatternWithSingleState() throws Exception |
| { |
| inputPattern = new Integer[]{0}; |
| pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); |
| streamPatternMatcher.setPattern(pattern); |
| streamPatternMatcher.setup(null); |
| streamPatternMatcher.beginWindow(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(1); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.inputPort.process(0); |
| streamPatternMatcher.endWindow(); |
| Assert.assertEquals("The number of tuples emitted are three", 5, sink.collectedTuples.size()); |
| for (Object object : sink.collectedTuples) { |
| Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object); |
| } |
| } |
| |
| @Test |
| public void testAutoGeneratedPattern() throws Exception |
| { |
| Random random = new Random(); |
| int patternSize = 15; |
| inputPattern = new Integer[patternSize]; |
| int max = 10; |
| int min = 1; |
| int primeNumber = 5; |
| for (int i = 0; i < patternSize; i++) { |
| inputPattern[i] = (min + random.nextInt(max)); |
| } |
| pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); |
| streamPatternMatcher.setPattern(pattern); |
| streamPatternMatcher.setup(null); |
| streamPatternMatcher.beginWindow(0); |
| int numberOfIterations = 20; |
| for (int i = 0; i < patternSize; i++) { |
| for (int j = 0; j <= i; j++) { |
| streamPatternMatcher.inputPort.process(inputPattern[j]); |
| } |
| for (int k = 0; k < numberOfIterations; k++) { |
| streamPatternMatcher.inputPort.process(max + min + random.nextInt(max)); |
| } |
| if (i % primeNumber == 0) { |
| for (int j = 0; j < patternSize; j++) { |
| streamPatternMatcher.inputPort.process(inputPattern[j]); |
| } |
| } |
| } |
| streamPatternMatcher.endWindow(); |
| Assert.assertEquals("The number of tuples emitted ", 1 + patternSize / primeNumber, sink.collectedTuples.size()); |
| for (Object output : sink.collectedTuples) { |
| Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, output); |
| } |
| } |
| } |