blob: bc96ad2e269c8c056eabd9ecd80830cece4a5c34 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
package org.apache.edgent.test.connectors.command;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assume.assumeTrue;
import java.lang.ProcessBuilder.Redirect;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.connectors.command.CommandStreams;
import org.apache.edgent.test.connectors.common.FileUtil;
import org.apache.edgent.test.connectors.common.TestRepoPath;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.tester.Condition;
import org.junit.Test;
public class CommandStreamsTest extends DirectTopologyTestBase {
private String[] stdLines = new String[] {
"Line 1",
"Line 2",
"Line 3",
private boolean isWindows() {
return System.getProperty("").contains("Windows");
private String[] mkCatFileCmd(String path) {
if (isWindows()) {
return new String[] {"cmd", "/c", "type", path};
else {
return new String[] {"cat", path};
private String[] mkCatStdInOutCmd() {
if (isWindows()) {
return new String[] {"cmd", "/c", "findstr", ".*"};
else {
return new String[] {"cat"};
public String[] getLines() {
return stdLines;
private static void delay(long millis) {
try {
catch (InterruptedException e) { }
public void testTokenize() {
String cmdString = "myCmd arg1 arg2\targ3";
String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
public void testPeriodicSource() throws Exception {
Topology t = newTopology("testPeriodicSource");
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
System.out.println("Test: "+t.getName()+" "+tempFile1);
ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
int NUM_POLLS = 3;
List<String> expLines = new ArrayList<>();
for (int i = 0; i < NUM_POLLS; i++) {
TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
TStream<String> s = ls.flatMap(list -> list);
try {
completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
finally {
public void testGenerate() throws Exception {
Topology t = newTopology("testGenerate");
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
System.out.println("Test: "+t.getName()+" "+tempFile1);
ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
// N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
TStream<String> s = CommandStreams.generate(t, cmd);
try {
completeAndValidate("", t, s, 10, getLines());
finally {
public void testGenerateRestart() throws Exception {
Topology t = newTopology("testGenerateRestart");
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
System.out.println("Test: "+t.getName()+" "+tempFile1);
ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
int NUM_RUNS = 3;
List<String> expLines = new ArrayList<>();
for (int i = 0; i < NUM_RUNS; i++) {
// N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
TStream<String> s = CommandStreams.generate(t, cmd);
completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
public void testSink() throws Exception {
Topology t = newTopology("testSink");
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
System.out.println("Test: "+t.getName()+" "+tempFile1);
ProcessBuilder cmd = new ProcessBuilder(mkCatStdInOutCmd())
TStream<String> s = t.strings(getLines());
TSink<String> sink = CommandStreams.sink(s, cmd);
try {
// complete when the sink has generated the expected results
Condition<Object> tc = new Condition<Object>() {
public boolean valid() {
try {
return FileUtil.validateFile(tempFile1, getLines(), true);
} catch (Exception e) {
return false;
public Object getResult() { return "todo-files-lines"; }
// If we time out, still validate content to see what we did get
try {
complete(t, tc, 3, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("test time out");
FileUtil.validateFile(tempFile1, getLines());
finally {
public void testSinkRestart() throws Exception {
Topology t = newTopology("testSinkRestart");
// until someone cares enough to create Win version of sinkcmd
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
System.out.println("Test: "+t.getName()+" "+tempFile1);
int batchSize = getLines().length;
// tell cmd to terminate after each batch of lines
ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
.redirectError( File("/dev/stderr")));
int NUM_RUNS = 3;
List<String> expLines = new ArrayList<>();
for (int i = 0; i < NUM_RUNS; i++) {
AtomicInteger cnt = new AtomicInteger();
TStream<String> s = t.strings(expLines.toArray(new String[0]))
.filter(tup -> {
// need to slow things down so the sinker has time to notice
// the cmd has terminated. otherwise we'll get ahead,
// tuples will get dropped on the floor and validation will fail.
if (cnt.incrementAndGet() > batchSize) {
// System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
return true;
TSink<String> sink = CommandStreams.sink(s, cmd);
try {
// complete when the sink has generated the expected results
Condition<Object> tc = new Condition<Object>() {
public boolean valid() {
try {
return FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]), true);
} catch (Exception e) {
return false;
public Object getResult() { return "todo-files-lines"; }
// If we time out, still validate content to see what we did get
try {
complete(t, tc, 6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("test time out");
FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
finally {
private String getCmdPath(String cmd) {
return TestRepoPath.getPath(cmd);