blob: 447326f97fef5a2b9279aff683de43625044d7ab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.debug;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.debug.TupleRecorder.PortInfo;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.WaitCondition;
import com.datatorrent.stram.util.FSPartFileCollection;
/**
*
*/
public class TupleRecorderTest
{
private final String classname;
@Before
public void setup() throws IOException
{
StreamingContainer.eventloop.start();
}
@After
public void teardown()
{
StreamingContainer.eventloop.stop();
}
public TupleRecorderTest()
{
classname = "com.datatorrent.stram.debug.TupleRecorderCollection";
}
public TupleRecorder getTupleRecorder(final StramLocalCluster localCluster, final PTOperator op)
{
TupleRecorderCollection instance = (TupleRecorderCollection)localCluster.getContainer(op).getInstance(classname);
return instance.getTupleRecorder(op.getId(), null);
}
public class Tuple
{
public String key;
public String value;
}
@Test
public void testRecorder() throws IOException
{
try (FileSystem fs = new LocalFileSystem()) {
TupleRecorder recorder = new TupleRecorder(null, "application_test_id_1");
recorder.getStorage().setBytesPerPartFile(4096);
recorder.getStorage().setLocalMode(true);
recorder.getStorage().setBasePath("file://" + testWorkDir.getAbsolutePath() + "/recordings");
recorder.addInputPortInfo("ip1", "str1");
recorder.addInputPortInfo("ip2", "str2");
recorder.addInputPortInfo("ip3", "str3");
recorder.addOutputPortInfo("op1", "str4");
recorder.setup(null, null);
recorder.beginWindow(1000);
recorder.beginWindow(1000);
recorder.beginWindow(1000);
Tuple t1 = new Tuple();
t1.key = "speed";
t1.value = "5m/h";
recorder.writeTuple(t1, "ip1");
recorder.endWindow();
Tuple t2 = new Tuple();
t2.key = "speed";
t2.value = "4m/h";
recorder.writeTuple(t2, "ip3");
recorder.endWindow();
Tuple t3 = new Tuple();
t3.key = "speed";
t3.value = "6m/h";
recorder.writeTuple(t3, "ip2");
recorder.endWindow();
recorder.beginWindow(1000);
Tuple t4 = new Tuple();
t4.key = "speed";
t4.value = "2m/h";
recorder.writeTuple(t4, "op1");
recorder.endWindow();
recorder.teardown();
fs.initialize((new Path(recorder.getStorage().getBasePath()).toUri()), new Configuration());
Path path;
String line;
path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.INDEX_FILE);
try (FSDataInputStream is = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
line = br.readLine();
// Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
Assert.assertTrue("check index", line
.matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
}
path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.META_FILE);
try (FSDataInputStream is = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
ObjectMapper mapper = new ObjectMapper();
line = br.readLine();
Assert.assertEquals("check version", "1.2", line);
br.readLine(); // RecordInfo
//RecordInfo ri = mapper.readValue(line, RecordInfo.class);
line = br.readLine();
PortInfo pi = mapper.readValue(line, PortInfo.class);
Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
line = br.readLine();
pi = mapper.readValue(line, PortInfo.class);
Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
line = br.readLine();
pi = mapper.readValue(line, PortInfo.class);
Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
line = br.readLine();
pi = mapper.readValue(line, PortInfo.class);
Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
//line = br.readLine();
}
path = new Path(recorder.getStorage().getBasePath(), "part0.txt");
try (FSDataInputStream is = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
line = br.readLine();
Assert.assertTrue("check part0", line.startsWith("B:"));
Assert.assertTrue("check part0", line.endsWith(":1000"));
line = br.readLine();
Assert.assertTrue("check part0 1", line.startsWith("T:"));
Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
line = br.readLine();
Assert.assertTrue("check part0 2", line.startsWith("T:"));
Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
line = br.readLine();
Assert.assertTrue("check part0 3", line.startsWith("T:"));
Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
line = br.readLine();
Assert.assertTrue("check part0 4", line.startsWith("T:"));
Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
line = br.readLine();
Assert.assertTrue("check part0 5", line.startsWith("E:"));
Assert.assertTrue("check part0 5", line.endsWith(":1000"));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
private static final long testTupleCount = 10;
@Test
public void testRecordingFlow() throws Exception
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath(), null));
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024); // 1KB per part
TestGeneratorInputOperator op1 = dag.addOperator("op1", TestGeneratorInputOperator.class);
GenericTestOperator op2 = dag.addOperator("op2", GenericTestOperator.class);
GenericTestOperator op3 = dag.addOperator("op3", GenericTestOperator.class);
op1.setEmitInterval(100); // emit every 100 msec
dag.addStream("stream1", op1.outport, op2.inport1);//.setInline(true);
dag.addStream("stream2", op2.outport1, op3.inport1);//.setInline(true);
final StramLocalCluster localCluster = new StramLocalCluster(dag);
localCluster.runAsync();
final PTOperator ptOp2 = localCluster.findByLogicalNode(dag.getMeta(op2));
StramTestSupport.waitForActivation(localCluster, ptOp2);
testRecordingOnOperator(localCluster, ptOp2);
final PTOperator ptOp1 = localCluster.findByLogicalNode(dag.getMeta(op1));
StramTestSupport.waitForActivation(localCluster, ptOp1);
testRecordingOnOperator(localCluster, ptOp1);
localCluster.shutdown();
}
private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op) throws Exception
{
String id = "xyz";
localCluster.getStreamingContainerManager().startRecording(id, op.getId(), null, 0);
WaitCondition c = new WaitCondition()
{
@Override
public boolean isComplete()
{
return null != getTupleRecorder(localCluster, op);
}
};
Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(c, 10000));
final TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
long startTime = tupleRecorder.getStartTime();
String line;
File dir = new File(testWorkDir, "recordings/" + op.getId() + "/" + id);
File file;
file = new File(dir, FSPartFileCollection.META_FILE);
Assert.assertTrue("meta file should exist", file.exists());
int numPorts = tupleRecorder.getSinkMap().size();
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
line = br.readLine();
Assert.assertEquals("version should be 1.2", "1.2", line);
line = br.readLine();
JSONObject json = new JSONObject(line);
Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
Assert.assertTrue(numPorts > 0);
for (int i = 0; i < numPorts; i++) {
line = br.readLine();
Assert.assertTrue("should contain name, streamName, type and id", line != null && line
.contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line
.contains("\"id\""));
}
}
c = new WaitCondition()
{
@Override
public boolean isComplete()
{
return (tupleRecorder.getTotalTupleCount() >= testTupleCount);
}
};
Assert.assertTrue("Should record more than " + testTupleCount + " tuples within 15 seconds", StramTestSupport.awaitCompletion(c, 15000));
localCluster.getStreamingContainerManager().stopRecording(op.getId(), null);
c = new WaitCondition()
{
@Override
public boolean isComplete()
{
TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
return (tupleRecorder == null);
}
};
Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(c, 5000));
file = new File(dir, FSPartFileCollection.INDEX_FILE);
Assert.assertTrue("index file should exist", file.exists());
ArrayList<String> partFiles = new ArrayList<>();
int indexCount = 0;
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
while ((line = br.readLine()) != null) {
String partFile = "part" + indexCount + ".txt";
if (line.startsWith("F:" + partFile + ":")) {
partFiles.add(partFile);
indexCount++;
} else if (line.startsWith("E")) {
Assert.assertEquals("index file should end after E line", br.readLine(), null);
break;
} else {
Assert.fail("index file line is not starting with F or E");
}
}
}
int[] tupleCount = new int[numPorts];
boolean beginWindowExists = false;
boolean endWindowExists = false;
for (String partFile : partFiles) {
file = new File(dir, partFile);
if (!partFile.equals(partFiles.get(partFiles.size() - 1))) {
Assert.assertTrue(partFile + " should be greater than 1KB", file.length() >= 1024);
}
Assert.assertTrue(partFile + " should exist", file.exists());
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
while ((line = br.readLine()) != null) {
if (line.startsWith("B:")) {
beginWindowExists = true;
} else if (line.startsWith("E:")) {
endWindowExists = true;
} else if (line.startsWith("T:")) {
String[] parts = line.split(":");
tupleCount[Integer.valueOf(parts[2])]++;
}
}
}
}
Assert.assertTrue("begin window should exist", beginWindowExists);
Assert.assertTrue("end window should exist", endWindowExists);
int sum = 0;
for (int i = 0; i < numPorts; i++) {
Assert.assertTrue("tuple exists for port " + i, tupleCount[i] > 0);
sum += tupleCount[i];
}
Assert.assertTrue("total tuple count >= " + testTupleCount, sum >= testTupleCount);
}
private static final Logger logger = LoggerFactory.getLogger(TupleRecorderTest.class);
}