[ZEPPELIN-3548] KSQL Interpreter for Zeppelin
### What is this PR for?
This PR adds the support to [KSQL](https://docs.confluent.io/current/ksql/docs/developer-guide/api.html)
### What type of PR is it?
[Feature]
### Todos
* [x] - Created the interpreter
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-3548
### How should this be tested?
You can use the following guide in order to spin-up the KSQL stack.
https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc
### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No, I didn't add any new dependency into the project.
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Andrea Santurbano <santand@gmail.com>
Closes #3485 from conker84/ksql and squashes the following commits:
baa480f32 [Andrea Santurbano] PR review feedback
ad3785404 [Andrea Santurbano] [ZEPPELIN-3548] KSQL Interpreter for Zeppelin
diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png
new file mode 100644
index 0000000..276602b
--- /dev/null
+++ b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png
Binary files differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png
new file mode 100644
index 0000000..e741503
--- /dev/null
+++ b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png
Binary files differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png
new file mode 100644
index 0000000..abb44a4
--- /dev/null
+++ b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png
Binary files differ
diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md
new file mode 100644
index 0000000..bc91ade
--- /dev/null
+++ b/docs/interpreter/ksql.md
@@ -0,0 +1,78 @@
+---
+layout: page
+title: "KSQL Interpreter for Apache Zeppelin"
+description: "SQL is the streaming SQL engine for Apache Kafka and provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka."
+group: interpreter
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+{% include JB/setup %}
+
+# KSQL Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka,
+
+## Configuration
+<table class="table-configuration">
+ <thead>
+ <tr>
+ <th>Property</th>
+ <th>Default</th>
+ <th>Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>ksql.url</td>
+ <td>http://localhost:8080</td>
+ <td>The KSQL Endpoint base URL</td>
+ </tr>
+ </tbody>
+</table>
+
+N.b. The interpreter supports all the KSQL properties, i.e. `ksql.streams.auto.offset.reset`.
+The full list of KSQL parameters is [here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html).
+
+## Using the KSQL Interpreter
+In a paragraph, use `%ksql` and start your SQL query in order to start to interact with KSQL.
+
+Following some examples:
+
+```
+%ksql
+PRINT 'orders';
+```
+
+
+
+```
+%ksql
+CREATE STREAM ORDERS WITH
+ (VALUE_FORMAT='AVRO',
+ KAFKA_TOPIC ='orders');
+```
+
+
+
+```
+%ksql
+SELECT *
+FROM ORDERS
+LIMIT 10
+```
+
+
\ No newline at end of file
diff --git a/ksql/README.md b/ksql/README.md
new file mode 100644
index 0000000..22f89f2
--- /dev/null
+++ b/ksql/README.md
@@ -0,0 +1,10 @@
+# Overview
+KSQL interpreter for Apache Zeppelin
+
+# Connection
+The Interpreter opens a connection with the KSQL REST endpoint.
+
+# Confluent KSQL resources
+Following a list of useful resources:
+ * [Docs](https://docs.confluent.io/current/ksql/docs/index.html)
+ * [Getting Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc)
diff --git a/ksql/pom.xml b/ksql/pom.xml
new file mode 100644
index 0000000..b3b25b1
--- /dev/null
+++ b/ksql/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin-interpreter-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-ksql</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Kafka SQL interpreter</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <interpreter.name>ksql</interpreter.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
new file mode 100644
index 0000000..72e41db
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class BasicKSQLHttpClient implements Closeable {
+
+ public static final String UTF_8 = "utf-8";
+
+ interface BasicHTTPClientResponse {
+ void onMessage(int status, String message);
+
+ void onError(int status, String message);
+ }
+
+ private final String jsonData;
+ private final Map<String, Object> formData;
+ private final String type;
+ private final Map<String, String> headers;
+ private final URL url;
+ private HttpURLConnection connection;
+ private final int timeout;
+ private boolean connected;
+
+
+ public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> formData,
+ String type, Map<String, String> headers, int timeout)
+ throws IOException {
+ this.url = new URL(url);
+ this.jsonData = jsonData;
+ this.formData = formData;
+ this.type = type;
+ this.headers = headers;
+ this.timeout = timeout;
+ this.connected = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ connected = false;
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+
+ private void writeOutput(String data) throws IOException {
+ try (OutputStream os = connection.getOutputStream()) {
+ byte[] input = data.getBytes(UTF_8);
+ os.write(input);
+ }
+ }
+
+ public String connect() throws IOException {
+ int status = createConnection();
+ boolean isStatusOk = isStatusOk(status);
+ return IOUtils.toString(isStatusOk ?
+ connection.getInputStream() : connection.getErrorStream(), UTF_8);
+ }
+
+ public void connectAsync(BasicHTTPClientResponse onResponse) throws IOException {
+ int status = createConnection();
+ boolean isStatusOk = isStatusOk(status);
+ long start = System.currentTimeMillis();
+
+ try (InputStreamReader in = new InputStreamReader(connection.getInputStream(), UTF_8);
+ BufferedReader br = new BufferedReader(in)) {
+ while (connected && (timeout == -1 || System.currentTimeMillis() - start < timeout)) {
+ if (br.ready()) {
+ String responseLine = br.readLine();
+ if (responseLine == null || responseLine.isEmpty()) {
+ continue;
+ }
+ if (isStatusOk) {
+ onResponse.onMessage(status, responseLine.trim());
+ } else {
+ onResponse.onError(status, responseLine.trim());
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isStatusOk(int status) {
+ return status >= 200 && status < 300;
+ }
+
+ private int createConnection() throws IOException {
+ this.connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod(this.type);
+ this.headers.forEach((k, v) -> connection.setRequestProperty(k, v));
+ connection.setDoOutput(true);
+ if (jsonData != null && !jsonData.isEmpty()) {
+ writeOutput(jsonData);
+ } else if (formData != null && !formData.isEmpty()) {
+ String queryStringParams = formData.entrySet()
+ .stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.joining("&"));
+ writeOutput(queryStringParams);
+ }
+ connected = true;
+ return connection.getResponseCode();
+ }
+
+ static class Builder {
+ private String url;
+ private String json;
+ private Map<String, Object> formData = new HashMap<>();
+ private String type;
+ private Map<String, String> headers = new HashMap<>();
+ private int timeout = -1;
+
+ public Builder withTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Builder withJson(String json) {
+ this.json = json;
+ return this;
+ }
+
+ public Builder withType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder withHeader(String header, String value) {
+ this.headers.put(header, value);
+ return this;
+ }
+
+ public Builder withFormData(String name, Object value) {
+ this.formData.put(name, value);
+ return this;
+ }
+
+ public BasicKSQLHttpClient build() throws IOException {
+ return new BasicKSQLHttpClient(url, json, formData, type, headers, timeout);
+ }
+
+ }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
new file mode 100644
index 0000000..461b97a
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
@@ -0,0 +1,169 @@
+/*
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.zeppelin.ksql;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+
+public class KSQLInterpreter extends Interpreter {
+ private static final String NEW_LINE = "\n";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class);
+ public static final String TABLE_DELIMITER = "\t";
+
+ private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
+
+ private final KSQLRestService ksqlRestService;
+
+ private static final ObjectMapper json = new ObjectMapper();
+
+ public KSQLInterpreter(Properties properties) {
+ this(properties, new KSQLRestService(properties.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().toString(),
+ e -> e.getValue() != null ? e.getValue().toString() : null))));
+ }
+
+ // VisibleForTesting
+ public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) {
+ super(properties);
+ this.ksqlRestService = ksqlRestService;
+ }
+
+ @Override
+ public void open() throws InterpreterException {}
+
+ @Override
+ public void close() throws InterpreterException {
+ ksqlRestService.close();
+ }
+
+ private String writeValueAsString(Object data) {
+ try {
+ if (data instanceof Collection || data instanceof Map) {
+ return json.writeValueAsString(data);
+ }
+ if (data instanceof String) {
+ return (String) data;
+ }
+ return String.valueOf(data);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void checkResponseErrors(String message) throws IOException {
+ if (StringUtils.isNotBlank(message)) {
+ // throw new RuntimeException(message);
+ interpreterOutput.getInterpreterOutput().write("%text");
+ interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+ interpreterOutput.getInterpreterOutput().write(message);
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String query,
+ InterpreterContext context) throws InterpreterException {
+ if (StringUtils.isBlank(query)) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ }
+ interpreterOutput.setInterpreterOutput(context.out);
+ try {
+ interpreterOutput.getInterpreterOutput().flush();
+ interpreterOutput.getInterpreterOutput().write("%table");
+ interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+ Set<String> header = new LinkedHashSet<>();
+ executeQuery(context.getParagraphId(), query.trim(), header);
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ } catch (IOException e) {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+
+ private void executeQuery(final String paragraphId,
+ final String query, Set<String> header) throws IOException {
+ AtomicBoolean isFirstLine = new AtomicBoolean(true);
+ ksqlRestService
+ .executeQuery(paragraphId, query, (resp) -> {
+ try {
+ if (resp.getRow() == null || resp.getRow().isEmpty()) {
+ return;
+ }
+ if (isFirstLine.get()) {
+ isFirstLine.set(false);
+ header.addAll(resp.getRow().keySet());
+ interpreterOutput.getInterpreterOutput().write(header.stream()
+ .collect(Collectors.joining(TABLE_DELIMITER)));
+ interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+ }
+ interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream()
+ .map(this::writeValueAsString)
+ .collect(Collectors.joining(TABLE_DELIMITER)));
+ interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+ checkResponseErrors(resp.getFinalMessage());
+ checkResponseErrors(resp.getErrorMessage());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ logger.info("Trying to cancel paragraphId {}", context.getParagraphId());
+ try {
+ ksqlRestService.closeClient(context.getParagraphId());
+ logger.info("Removed");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ KSQLInterpreter.class.getName() + this.hashCode());
+ }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
new file mode 100644
index 0000000..a05a70d
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class KSQLRequest {
+
+ private static final String EXPLAIN_QUERY = "EXPLAIN %s";
+ private final String ksql;
+ private final Map<String, String> streamsProperties;
+
+ KSQLRequest(final String ksql, final Map<String, String> streamsProperties) {
+ String inputQuery = Objects.requireNonNull(ksql, "ksql")
+ .replaceAll("[\\n\\t\\r]", " ")
+ .trim();
+ this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";";
+ this.streamsProperties = streamsProperties;
+ }
+
+ KSQLRequest(final String ksql) {
+ this(ksql, Collections.emptyMap());
+ }
+
+ KSQLRequest toExplainRequest() {
+ return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), this.streamsProperties);
+ }
+
+ public String getKsql() {
+ return ksql;
+ }
+
+ public Map<String, String> getStreamsProperties() {
+ return streamsProperties;
+ }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
new file mode 100644
index 0000000..4646135
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class KSQLResponse {
+ private final Map<String, Object> row;
+ private final String finalMessage;
+ private final String errorMessage;
+ private final boolean terminal;
+
+ private <T, K, U> Collector<T, ?, Map<K, U>>
+ toLinkedHashMap(Function<? super T, ? extends K> keyMapper,
+ Function<? super T, ? extends U> valueMapper) {
+ return Collectors.toMap(
+ keyMapper,
+ valueMapper,
+ (u, v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); },
+ LinkedHashMap::new);
+ }
+
+ KSQLResponse(final List<String> fields, final Map<String, Object> row,
+ final String finalMessage, final String errorMessage, boolean terminal) {
+ List<Object> columns = row == null ? null : (List<Object>) row.getOrDefault("columns",
+ Collections.emptyList());
+ this.row = row == null ? null : IntStream.range(0, columns.size())
+ .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index),
+ columns.get(index)))
+ .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue()));
+ this.finalMessage = finalMessage;
+ this.errorMessage = errorMessage;
+ this.terminal = terminal;
+ }
+
+ KSQLResponse(final List<String> fields, final Map<String, Object> resp) {
+ this(fields, (Map<String, Object>) resp.get("row"),
+ (String) resp.get("finalMessage"),
+ (String) resp.get("errorMessage"),
+ (boolean) resp.get("terminal"));
+ }
+
+ KSQLResponse(final Map<String, Object> resp) {
+ this.row = resp;
+ this.finalMessage = null;
+ this.errorMessage = null;
+ this.terminal = true;
+ }
+
+ public Map<String, Object> getRow() {
+ return row;
+ }
+
+ public String getFinalMessage() {
+ return finalMessage;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public boolean isTerminal() {
+ return terminal;
+ }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
new file mode 100644
index 0000000..3393330
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class KSQLRestService {
+
+ private static final String KSQL_ENDPOINT = "%s/ksql";
+ private static final String QUERY_ENDPOINT = "%s/query";
+
+ private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8";
+
+ private static final List<String> KSQL_COMMON_FIELDS = Arrays
+ .asList("statementText", "warnings", "@type");
+ private static final String KSQL_URL = "ksql.url";
+
+ private static final ObjectMapper json = new ObjectMapper();
+
+ private final String ksqlUrl;
+ private final String queryUrl;
+ private final String baseUrl;
+ private final Map<String, String> streamsProperties;
+
+ private final Map<String, BasicKSQLHttpClient> clientCache;
+
+ public KSQLRestService(Map<String, String> props) {
+ baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString();
+ ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl);
+ queryUrl = String.format(QUERY_ENDPOINT, baseUrl);
+ clientCache = new ConcurrentHashMap<>();
+ this.streamsProperties = props.entrySet().stream()
+ .filter(e -> e.getKey().startsWith("ksql.") && !e.getKey().equals(KSQL_URL))
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+
+ public void executeQuery(final String paragraphId, final String query,
+ final Consumer<KSQLResponse> callback) throws IOException {
+ KSQLRequest request = new KSQLRequest(query, streamsProperties);
+ if (isSelect(request)) {
+ executeSelect(paragraphId, callback, request);
+ } else if (isPrint(request)) {
+ executePrint(paragraphId, callback, request);
+ } else {
+ executeKSQL(paragraphId, callback, request);
+ }
+ }
+
+ private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback,
+ KSQLRequest request) throws IOException {
+ try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, ksqlUrl)) {
+ List<Map<String, Object>> queryResponse = json.readValue(client.connect(), List.class);
+ queryResponse.stream()
+ .map(map -> excludeKSQLCommonFields(map))
+ .flatMap(map -> map.entrySet().stream()
+ .filter(e -> e.getValue() instanceof List)
+ .flatMap(e -> ((List<Map<String, Object>>) e.getValue()).stream()))
+ .map(KSQLResponse::new)
+ .forEach(callback::accept);
+ queryResponse.stream()
+ .map(map -> excludeKSQLCommonFields(map))
+ .flatMap(map -> map.entrySet().stream()
+ .filter(e -> e.getValue() instanceof Map)
+ .map(e -> (Map<String, Object>) e.getValue()))
+ .map(KSQLResponse::new)
+ .forEach(callback::accept);
+ }
+ }
+
+ private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) {
+ return map.entrySet().stream()
+ .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey()))
+ .collect(Collectors
+ .toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+ private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest request,
+ String url) throws IOException {
+ BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
+ .withUrl(url)
+ .withJson(json.writeValueAsString(request))
+ .withType("POST")
+ .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
+ .build();
+ BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client);
+ if (oldClient != null) {
+ oldClient.close();
+ }
+ return client;
+ }
+
+ private void executeSelect(String paragraphId, Consumer<KSQLResponse> callback,
+ KSQLRequest request) throws IOException {
+ List<String> fieldNames = getFields(request);
+ if (fieldNames.isEmpty()) {
+ throw new RuntimeException("Field are empty");
+ }
+ try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
+ client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
+ @Override
+ public void onMessage(int status, String message) {
+ try {
+ Map<String, Object> queryResponse = json.readValue(message, LinkedHashMap.class);
+ KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse);
+ callback.accept(resp);
+ if (resp.isTerminal() || StringUtils.isNotBlank(resp.getErrorMessage())
+ || StringUtils.isNotBlank(resp.getFinalMessage())) {
+ client.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onError(int status, String message) {
+ try {
+ KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
+ callback.accept(resp);
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private void executePrint(String paragraphId, Consumer<KSQLResponse> callback,
+ KSQLRequest request) throws IOException {
+ try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
+ client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
+ @Override
+ public void onMessage(int status, String message) {
+ if (message.toUpperCase().startsWith("FORMAT:")) {
+ return;
+ }
+ List<String> elements = Arrays.asList(message.split(","));
+ Map<String, Object> row = new LinkedHashMap<>();
+ row.put("timestamp", elements.get(0));
+ row.put("offset", elements.get(1));
+ row.put("record", String.join("", elements.subList(2, elements.size())));
+ KSQLResponse resp = new KSQLResponse(row);
+ callback.accept(resp);
+ }
+
+ @Override
+ public void onError(int status, String message) {
+ try {
+ KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
+ callback.accept(resp);
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private boolean isSelect(KSQLRequest request) {
+ return request.getKsql().toUpperCase().startsWith("SELECT");
+ }
+
+ private boolean isPrint(KSQLRequest request) {
+ return request.getKsql().toUpperCase().startsWith("PRINT");
+ }
+
+ public void closeClient(final String paragraphId) throws IOException {
+ BasicKSQLHttpClient toClose = clientCache.remove(paragraphId);
+ if (toClose != null) {
+ toClose.close();
+ }
+ }
+
+ private List<String> getFields(KSQLRequest request) throws IOException {
+ return getFields(request, false);
+ }
+
+ private List<String> getFields(KSQLRequest request, boolean tryCoerce) throws IOException {
+ if (tryCoerce) {
+ /*
+ * this because a query like
+ * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 10;`
+ * fails with the message `Column STATE cannot be resolved`
+ * so we try to coerce the field resolution
+ */
+ String query = request.getKsql()
+ .substring(0, request.getKsql().toUpperCase().indexOf("WHERE"));
+ request = new KSQLRequest(query, request.getStreamsProperties());
+ }
+ try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
+ .withUrl(ksqlUrl)
+ .withJson(json.writeValueAsString(request.toExplainRequest()))
+ .withType("POST")
+ .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
+ .build()) {
+ List<Map<String, Object>> explainResponseList = json.readValue(client.connect(), List.class);
+ Map<String, Object> explainResponse = explainResponseList.get(0);
+ Map<String, Object> queryDescription = (Map<String, Object>) explainResponse
+ .getOrDefault("queryDescription", Collections.emptyMap());
+ List<Map<String, Object>> fields = (List<Map<String, Object>>) queryDescription
+ .getOrDefault("fields", Collections.emptyList());
+ return fields.stream()
+ .map(elem -> elem.getOrDefault("name", "").toString())
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ if (!tryCoerce) {
+ return getFields(request, true);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+
+ public void close() {
+ Set<String> keys = clientCache.keySet();
+ keys.forEach(key -> {
+ try {
+ closeClient(key);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}
diff --git a/ksql/src/main/resources/interpreter-setting.json b/ksql/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..cf15bbf
--- /dev/null
+++ b/ksql/src/main/resources/interpreter-setting.json
@@ -0,0 +1,21 @@
+[
+ {
+ "group": "ksql",
+ "name": "ksql",
+ "className": "org.apache.zeppelin.ksql.KSQLInterpreter",
+ "properties": {
+ "ksql.url": {
+ "envName": null,
+ "propertyName": "ksql.url",
+ "defaultValue": "http://localhost:8088",
+ "description": "KSQL Endpoint base URL",
+ "type": "string"
+ }
+ },
+ "editor": {
+ "language": "sql",
+ "editOnDblClick": false,
+ "completionSupport": false
+ }
+ }
+]
diff --git a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
new file mode 100644
index 0000000..b9520fb
--- /dev/null
+++ b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
@@ -0,0 +1,169 @@
+/*
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.zeppelin.ksql;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Stubber;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class KSQLInterpreterTest {
+
+ private InterpreterContext context;
+
+ private static final Map<String, String> PROPS = new HashMap<String, String>() {{
+ put("ksql.url", "http://localhost:8088");
+ put("ksql.streams.auto.offset.reset", "earliest");
+ }};
+
+
+ @Before
+ public void setUpZeppelin() throws IOException {
+ context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setParagraphId("ksql-test")
+ .build();
+ }
+
+ @Test
+ public void shouldRenderKSQLSelectAsTable() throws InterpreterException,
+ IOException, InterruptedException {
+ // given
+ Properties p = new Properties();
+ p.putAll(PROPS);
+ KSQLRestService service = Mockito.mock(KSQLRestService.class);
+ Stubber stubber = Mockito.doAnswer((invocation) -> {
+ Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
+ invocation.getArguments()[2];
+ IntStream.range(1, 5)
+ .forEach(i -> {
+ Map<String, Object> map = new HashMap<>();
+ if (i == 4) {
+ map.put("row", null);
+ map.put("terminal", true);
+ } else {
+ map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i)));
+ map.put("terminal", false);
+ }
+ callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map));
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ return null;
+ });
+ stubber.when(service).executeQuery(Mockito.any(String.class),
+ Mockito.anyString(),
+ Mockito.any(Consumer.class));
+ Interpreter interpreter = new KSQLInterpreter(p, service);
+
+ // when
+ String query = "select * from orders";
+ interpreter.interpret(query, context);
+
+ // then
+ String expected = "%table fieldName\n" +
+ "value 1\n" +
+ "value 2\n" +
+ "value 3\n";
+ assertEquals(1, context.out.toInterpreterResultMessage().size());
+ assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString());
+ assertEquals(InterpreterResult.Type.TABLE, context.out
+ .toInterpreterResultMessage().get(0).getType());
+ interpreter.close();
+ }
+
+ @Test
+ public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException,
+ IOException, InterruptedException {
+ // given
+ Properties p = new Properties();
+ p.putAll(PROPS);
+ KSQLRestService service = Mockito.mock(KSQLRestService.class);
+ Map<String, Object> row1 = new HashMap<>();
+ row1.put("name", "orders");
+ row1.put("registered", "false");
+ row1.put("replicaInfo", "[1]");
+ row1.put("consumerCount", "0");
+ row1.put("consumerGroupCount", "0");
+ Map<String, Object> row2 = new HashMap<>();
+ row2.put("name", "orders");
+ row2.put("registered", "false");
+ row2.put("replicaInfo", "[1]");
+ row2.put("consumerCount", "0");
+ row2.put("consumerGroupCount", "0");
+ Stubber stubber = Mockito.doAnswer((invocation) -> {
+ Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
+ invocation.getArguments()[2];
+ callback.accept(new KSQLResponse(row1));
+ callback.accept(new KSQLResponse(row2));
+ return null;
+ });
+ stubber.when(service).executeQuery(
+ Mockito.any(String.class),
+ Mockito.anyString(),
+ Mockito.any(Consumer.class));
+ Interpreter interpreter = new KSQLInterpreter(p, service);
+
+ // when
+ String query = "show topics";
+ interpreter.interpret(query, context);
+
+ // then
+ List<Map<String, Object>> expected = Arrays.asList(row1, row2);
+
+ String[] lines = context.out.toInterpreterResultMessage()
+ .get(0).toString()
+ .replace("%table ", "")
+ .trim()
+ .split("\n");
+ List<String[]> rows = Stream.of(lines)
+ .map(line -> line.split("\t"))
+ .collect(Collectors.toList());
+ List<Map<String, String>> actual = rows.stream()
+ .skip(1)
+ .map(row -> IntStream.range(0, row.length)
+ .mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index]))
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())))
+ .collect(Collectors.toList());
+ assertEquals(1, context.out.toInterpreterResultMessage().size());
+ assertEquals(expected, actual);
+ assertEquals(InterpreterResult.Type.TABLE, context.out
+ .toInterpreterResultMessage().get(0).getType());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 15e6c2c..ffe1802 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
<module>beam</module>
<module>hazelcastjet</module>
<module>geode</module>
+ <module>ksql</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-jupyter</module>