blob: 8d50debf34795ae33857653795cb99f0040691c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.common.Message;
import org.apache.zeppelin.common.SessionInfo;
import org.apache.zeppelin.client.websocket.MessageHandler;
import org.apache.zeppelin.client.websocket.StatementMessageHandler;
import org.apache.zeppelin.client.websocket.ZeppelinWebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Each ZSession represent one interpreter process, you can start/stop it, and execute/submit/cancel code.
* There's no Zeppelin concept(like note/paragraph) in ZSession.
*
*/
public class ZSession {
private static final Logger LOGGER = LoggerFactory.getLogger(ZSession.class);
private ZeppelinClient zeppelinClient;
private String interpreter;
private Map<String, String> intpProperties;
// max number of retained statements, each statement represent one paragraph.
private int maxStatement;
private SessionInfo sessionInfo;
private ZeppelinWebSocketClient webSocketClient;
public ZSession(ClientConfig clientConfig,
String interpreter) throws Exception {
this(clientConfig, interpreter, new HashMap<>(), 100);
}
public ZSession(ClientConfig clientConfig,
String interpreter,
Map<String, String> intpProperties,
int maxStatement) throws Exception {
this.zeppelinClient = new ZeppelinClient(clientConfig);
this.interpreter = interpreter;
this.intpProperties = intpProperties;
this.maxStatement = maxStatement;
}
private ZSession(ClientConfig clientConfig,
String interpreter,
String sessionId) throws Exception {
this.zeppelinClient = new ZeppelinClient(clientConfig);
this.interpreter = interpreter;
this.sessionInfo = new SessionInfo(sessionId);
}
public void login(String userName, String password) throws Exception {
this.zeppelinClient.login(userName, password);
}
/**
* Start this ZSession, underneath it would create a note for this ZSession and
* start a dedicated interpreter group.
* This method won't establish websocket connection.
*
* @throws Exception
*/
public void start() throws Exception {
start(null);
}
/**
* Start this ZSession, underneath it would create a note for this ZSession and
* start a dedicated interpreter process.
*
* @param messageHandler
* @throws Exception
*/
public void start(MessageHandler messageHandler) throws Exception {
this.sessionInfo = zeppelinClient.newSession(interpreter);
// inline configuration
StringBuilder builder = new StringBuilder("%" + interpreter + ".conf\n");
if (intpProperties != null) {
for (Map.Entry<String, String> entry : intpProperties.entrySet()) {
builder.append(entry.getKey() + " " + entry.getValue() + "\n");
}
}
String paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Configuration", builder.toString());
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
if (paragraphResult.getStatus() != Status.FINISHED) {
throw new Exception("Fail to configure session, " + paragraphResult.getResultInText());
}
// start session
// add local properties (init) to avoid skip empty paragraph.
paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Init", "%" + interpreter + "(init=true)");
paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
if (paragraphResult.getStatus() != Status.FINISHED) {
throw new Exception("Fail to init session, " + paragraphResult.getResultInText());
}
this.sessionInfo = zeppelinClient.getSession(getSessionId());
if (messageHandler != null) {
this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
.replace("https", "ws").replace("http", "ws") + "/ws");
// call GET_NOTE to establish websocket connection between this session and zeppelin-server
Message msg = new Message(Message.OP.GET_NOTE);
msg.put("id", getNoteId());
this.webSocketClient.send(msg);
}
}
/**
* Stop this underlying interpreter process.
*
* @throws Exception
*/
public void stop() throws Exception {
if (getSessionId() != null) {
zeppelinClient.stopSession(getSessionId());
}
if (webSocketClient != null) {
webSocketClient.stop();
}
}
/**
* Session has been started in ZeppelinServer, this method is just to reconnect it.
* This method is used for connect to an existing session in ZeppelinServer, instead of
* start it from ZSession.
* @throws Exception
*/
public static ZSession createFromExistingSession(ClientConfig clientConfig,
String interpreter,
String sessionId) throws Exception {
return createFromExistingSession(clientConfig, interpreter, sessionId, null);
}
/**
* Session has been started in ZeppelinServer, this method is just to reconnect it.
* This method is used for connect to an existing session in ZeppelinServer, instead of
* start it from ZSession.
* @throws Exception
*/
public static ZSession createFromExistingSession(ClientConfig clientConfig,
String interpreter,
String sessionId,
MessageHandler messageHandler) throws Exception {
ZSession session = new ZSession(clientConfig, interpreter, sessionId);
session.reconnect(messageHandler);
return session;
}
private void reconnect(MessageHandler messageHandler) throws Exception {
this.sessionInfo = this.zeppelinClient.getSession(getSessionId());
if (!sessionInfo.getState().equalsIgnoreCase("Running")) {
throw new Exception("Session " + getSessionId() + " is not running, state: " + sessionInfo.getState());
}
if (messageHandler != null) {
this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
.replace("https", "ws").replace("http", "ws") + "/ws");
// call GET_NOTE to establish websocket connection between this session and zeppelin-server
Message msg = new Message(Message.OP.GET_NOTE);
msg.put("id", getNoteId());
this.webSocketClient.send(msg);
}
}
/**
* Run code in non-blocking way.
*
* @param code
* @return
* @throws Exception
*/
public ExecuteResult execute(String code) throws Exception {
return execute("", code);
}
/**
* Run code in non-blocking way.
*
* @param code
* @param localProperties
* @return
* @throws Exception
*/
public ExecuteResult execute(String code, Map<String, String> localProperties) throws Exception {
return execute("", localProperties, code);
}
/**
* Run code in non-blocking way.
*
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult execute(String code, StatementMessageHandler messageHandler) throws Exception {
return execute("", code, messageHandler);
}
/**
*
* @param subInterpreter
* @param code
* @return
* @throws Exception
*/
public ExecuteResult execute(String subInterpreter, String code) throws Exception {
return execute(subInterpreter, new HashMap<>(), code);
}
/**
*
* @param subInterpreter
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult execute(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
return execute(subInterpreter, new HashMap<>(), code, messageHandler);
}
/**
*
* @param subInterpreter
* @param localProperties
* @param code
* @return
* @throws Exception
*/
public ExecuteResult execute(String subInterpreter,
Map<String, String> localProperties,
String code) throws Exception {
return execute(subInterpreter, localProperties, code, null);
}
/**
*
* @param subInterpreter
* @param localProperties
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult execute(String subInterpreter,
Map<String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception {
StringBuilder builder = new StringBuilder("%" + interpreter);
if (!StringUtils.isBlank(subInterpreter)) {
builder.append("." + subInterpreter);
}
if (localProperties != null && !localProperties.isEmpty()) {
builder.append("(");
List<String> propertyString = localProperties.entrySet().stream()
.map(entry -> ("\"" + entry.getKey() + "\"=\"" + entry.getValue() + "\""))
.collect(Collectors.toList());
builder.append(StringUtils.join(propertyString, ","));
builder.append(")");
}
builder.append(" " + code);
String text = builder.toString();
String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
if (messageHandler != null) {
webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
}
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), nextParagraphId, getSessionId());
return new ExecuteResult(paragraphResult);
}
/**
*
* @param code
* @return
* @throws Exception
*/
public ExecuteResult submit(String code) throws Exception {
return submit("", code);
}
/**
*
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult submit(String code, StatementMessageHandler messageHandler) throws Exception {
return submit("", code, messageHandler);
}
/**
*
* @param subInterpreter
* @param code
* @return
* @throws Exception
*/
public ExecuteResult submit(String subInterpreter, String code) throws Exception {
return submit(subInterpreter, new HashMap<>(), code);
}
/**
*
* @param subInterpreter
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult submit(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
return submit(subInterpreter, new HashMap<>(), code, messageHandler);
}
/**
*
* @param subInterpreter
* @param code
* @return
* @throws Exception
*/
public ExecuteResult submit(String subInterpreter, Map<String, String> localProperties, String code) throws Exception {
return submit(subInterpreter, localProperties, code, null);
}
/**
*
* @param subInterpreter
* @param code
* @param messageHandler
* @return
* @throws Exception
*/
public ExecuteResult submit(String subInterpreter,
Map<String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception {
StringBuilder builder = new StringBuilder("%" + interpreter);
if (!StringUtils.isBlank(subInterpreter)) {
builder.append("." + subInterpreter);
}
if (localProperties != null && !localProperties.isEmpty()) {
builder.append("(");
List<String> propertyString = localProperties.entrySet().stream()
.map(entry -> (entry.getKey() + "=\"" + entry.getValue() + "\""))
.collect(Collectors.toList());
builder.append(StringUtils.join(propertyString, ","));
builder.append(")");
}
builder.append(" " + code);
String text = builder.toString();
String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
if (messageHandler != null) {
webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
}
ParagraphResult paragraphResult = zeppelinClient.submitParagraph(getNoteId(), nextParagraphId, getSessionId());
return new ExecuteResult(paragraphResult);
}
/**
*
* @param statementId
* @throws Exception
*/
public void cancel(String statementId) throws Exception {
zeppelinClient.cancelParagraph(getNoteId(), statementId);
}
/**
*
* @param statementId
* @return
* @throws Exception
*/
public ExecuteResult queryStatement(String statementId) throws Exception {
ParagraphResult paragraphResult = zeppelinClient.queryParagraphResult(getNoteId(), statementId);
return new ExecuteResult(paragraphResult);
}
/**
*
* @param statementId
* @return
* @throws Exception
*/
public ExecuteResult waitUntilFinished(String statementId) throws Exception {
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(getNoteId(), statementId);
return new ExecuteResult(paragraphResult);
}
/**
*
* @param statementId
* @return
* @throws Exception
*/
public ExecuteResult waitUntilRunning(String statementId) throws Exception {
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphRunning(getNoteId(), statementId);
return new ExecuteResult(paragraphResult);
}
public String getNoteId() {
if (this.sessionInfo != null) {
return this.sessionInfo.getNoteId();
} else {
return null;
}
}
public String getWeburl() {
if (this.sessionInfo != null) {
return sessionInfo.getWeburl();
} else {
return null;
}
}
public String getSessionId() {
if (this.sessionInfo != null) {
return this.sessionInfo.getSessionId();
} else {
return null;
}
}
public String getInterpreter() {
return interpreter;
}
public ZeppelinClient getZeppelinClient() {
return zeppelinClient;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private ClientConfig clientConfig;
private String interpreter;
private Map<String, String> intpProperties;
private int maxStatement = 100;
public Builder setClientConfig(ClientConfig clientConfig) {
this.clientConfig = clientConfig;
return this;
}
public Builder setInterpreter(String interpreter) {
this.interpreter = interpreter;
return this;
}
public Builder setIntpProperties(Map<String, String> intpProperties) {
this.intpProperties = intpProperties;
return this;
}
public Builder setMaxStatement(int maxStatement) {
this.maxStatement = maxStatement;
return this;
}
public ZSession build() throws Exception {
return new ZSession(clientConfig, interpreter, intpProperties, maxStatement);
}
}
public static void main(String[] args) throws Exception {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080", 1000);
// ZSession hiveSession = new ZSession(clientConfig, "hive", new HashMap<>(), 100);
// hiveSession.start();
//
// ExecuteResult executeResult = hiveSession.submit("show tables");
// executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
// System.out.println(executeResult.toString());
//
// executeResult = hiveSession.submit("select eid, count(1) from employee group by eid");
// executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
// System.out.println(executeResult.toString());
ZSession sparkSession = ZSession.createFromExistingSession(clientConfig, "hive", "hive_1598418780469");
ExecuteResult executeResult = sparkSession.execute("show databases");
System.out.println(executeResult);
// ExecuteResult executeResult = sparkSession.submit("sql", "show tables");
// executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
// System.out.println(executeResult.toString());
//
// executeResult = sparkSession.submit("sql", "select eid, count(1) from employee group by eid");
// executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
// System.out.println(executeResult.toString());
}
}