blob: aa64ccf0363e593f825f1bfdc68e0ef87f040f87 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.io.fs;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import org.apache.apex.malhar.lib.testbench.RandomWordGenerator;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Lists;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
/**
* Test class to test {@link AbstractReconciler}
*/
public class AbstractReconcilerTest
{
public static class TestReconciler extends AbstractReconciler<byte[], TestMeta>
{
private long currentCommittedWindow;
private StringBuilder windowData = new StringBuilder();
@Override
protected void processTuple(byte[] input)
{
if (windowData.length() > 0) {
windowData.append(",");
}
windowData.append(input);
}
@Override
public void endWindow()
{
TestMeta meta = new TestMeta();
meta.windowid = currentWindowId;
meta.data = windowData.toString();
enqueueForProcessing(meta);
windowData.setLength(0);
}
@Override
protected void processCommittedData(TestMeta meta)
{
// only committed windows have to be processed
Assert.assertTrue("processing window id should not be greater than commited window id", meta.windowid <= currentCommittedWindow);
}
@Override
public void committed(long l)
{
currentCommittedWindow = l;
super.committed(l);
}
}
public static class TestMeta
{
long windowid;
String data;
}
public static class TestDag implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RandomWordGenerator generator = dag.addOperator("words", new RandomWordGenerator());
TestReconciler reconciler = dag.addOperator("synchronizer", new TestReconciler());
generator.setTuplesPerWindow(10);
dag.addStream("toWriter", generator.output, reconciler.input);
}
}
@Test
public void testReconciler() throws Exception
{
LocalMode lma = LocalMode.newInstance();
Configuration configuration = new Configuration();
lma.prepareDAG(new TestDag(), configuration);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
lc.run(15000);
lc.shutdown();
}
public static class TestReconciler1 extends AbstractReconciler<String, String>
{
public transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
private List<String> emitData = new ArrayList<String>();
@Override
public void beginWindow(long windowId)
{
for (String data : emitData) {
outputPort.emit(data);
}
emitData.clear();
super.beginWindow(windowId);
}
@Override
protected void processTuple(String input)
{
enqueueForProcessing(input);
}
@Override
protected void processCommittedData(String meta)
{
emitData.add(meta);
}
}
@Test
public void testOperator() throws Exception
{
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
TestReconciler1 reconciler1 = new TestReconciler1();
List<String> output = Lists.newArrayList();
reconciler1.outputPort.setSink(sink);
reconciler1.setup(null);
int windowId = 0;
reconciler1.beginWindow(windowId++);
reconciler1.input.process("a");
reconciler1.input.process("b");
reconciler1.endWindow();
reconciler1.beginWindow(windowId++);
reconciler1.input.process("c");
reconciler1.input.process("d");
reconciler1.endWindow();
reconciler1.committed(0);
Thread.sleep(500);
reconciler1.beginWindow(windowId++);
reconciler1.input.process("e");
reconciler1.input.process("f");
reconciler1.endWindow();
reconciler1.beginWindow(windowId++);
reconciler1.input.process("g");
reconciler1.input.process("h");
reconciler1.endWindow();
output.add("a");
output.add("b");
Assert.assertEquals(output, sink.collectedTuples);
output.clear();
sink.collectedTuples.clear();
reconciler1.committed(2);
Thread.sleep(500);
reconciler1.beginWindow(windowId++);
reconciler1.endWindow();
output.add("c");
output.add("d");
output.add("e");
output.add("f");
Assert.assertEquals(output, sink.collectedTuples);
reconciler1.teardown();
}
}