blob: 0fd01fd2a666619d3c3fc04bef4e52ef6c773d9d [file] [log] [blame]
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.ksql;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
public class KSQLInterpreterTest {
private InterpreterContext context;
private static final Map<String, String> PROPS = new HashMap<String, String>() {{
put("ksql.url", "http://localhost:8088");
put("ksql.streams.auto.offset.reset", "earliest");
}};
@Before
public void setUpZeppelin() throws IOException {
context = InterpreterContext.builder()
.setInterpreterOut(new InterpreterOutput())
.setParagraphId("ksql-test")
.build();
}
@Test
public void shouldRenderKSQLSelectAsTable() throws InterpreterException,
IOException, InterruptedException {
// given
Properties p = new Properties();
p.putAll(PROPS);
KSQLRestService service = Mockito.mock(KSQLRestService.class);
Stubber stubber = Mockito.doAnswer((invocation) -> {
Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
invocation.getArguments()[2];
IntStream.range(1, 5)
.forEach(i -> {
Map<String, Object> map = new HashMap<>();
if (i == 4) {
map.put("row", null);
map.put("terminal", true);
} else {
map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i)));
map.put("terminal", false);
}
callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return null;
});
stubber.when(service).executeQuery(Mockito.any(String.class),
Mockito.anyString(),
Mockito.any(Consumer.class));
Interpreter interpreter = new KSQLInterpreter(p, service);
// when
String query = "select * from orders";
interpreter.interpret(query, context);
// then
String expected = "%table fieldName\n" +
"value 1\n" +
"value 2\n" +
"value 3\n";
context.out.flush();
assertEquals(1, context.out.toInterpreterResultMessage().size());
assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString());
assertEquals(InterpreterResult.Type.TABLE, context.out
.toInterpreterResultMessage().get(0).getType());
interpreter.close();
}
@Test
public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException,
IOException, InterruptedException {
// given
Properties p = new Properties();
p.putAll(PROPS);
KSQLRestService service = Mockito.mock(KSQLRestService.class);
Map<String, Object> row1 = new HashMap<>();
row1.put("name", "orders");
row1.put("registered", "false");
row1.put("replicaInfo", "[1]");
row1.put("consumerCount", "0");
row1.put("consumerGroupCount", "0");
Map<String, Object> row2 = new HashMap<>();
row2.put("name", "orders");
row2.put("registered", "false");
row2.put("replicaInfo", "[1]");
row2.put("consumerCount", "0");
row2.put("consumerGroupCount", "0");
Stubber stubber = Mockito.doAnswer((invocation) -> {
Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
invocation.getArguments()[2];
callback.accept(new KSQLResponse(row1));
callback.accept(new KSQLResponse(row2));
return null;
});
stubber.when(service).executeQuery(
Mockito.any(String.class),
Mockito.anyString(),
Mockito.any(Consumer.class));
Interpreter interpreter = new KSQLInterpreter(p, service);
// when
String query = "show topics";
interpreter.interpret(query, context);
// then
List<Map<String, Object>> expected = Arrays.asList(row1, row2);
context.out.flush();
String[] lines = context.out.toInterpreterResultMessage()
.get(0).toString()
.replace("%table ", "")
.trim()
.split("\n");
List<String[]> rows = Stream.of(lines)
.map(line -> line.split("\t"))
.collect(Collectors.toList());
List<Map<String, String>> actual = rows.stream()
.skip(1)
.map(row -> IntStream.range(0, row.length)
.mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index]))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())))
.collect(Collectors.toList());
assertEquals(1, context.out.toInterpreterResultMessage().size());
assertEquals(expected, actual);
assertEquals(InterpreterResult.Type.TABLE, context.out
.toInterpreterResultMessage().get(0).getType());
}
}