blob: d1ef7168b117b9b5075aaa6d9fbd628755b979d2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server.session;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.*;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.WebApplicationException;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.server.BaseLensService;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.health.HealthStatus;
import org.apache.lens.server.api.session.*;
import org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.SystemVariables;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
import com.google.common.collect.Maps;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* The Class HiveSessionService.
*/
@Slf4j
public class HiveSessionService extends BaseLensService implements SessionService {
/** The restorable sessions. */
private List<LensSessionImpl.LensSessionPersistInfo> restorableSessions;
/** The session expiry thread. */
private ScheduledExecutorService sessionExpiryThread;
/** The session expiry runnable. */
private Runnable sessionExpiryRunnable = new SessionExpiryRunnable();
/** Service to manage database specific resources */
@Getter(AccessLevel.PROTECTED)
private DatabaseResourceService databaseResourceService;
/**
* The conf.
*/
private Configuration conf;
/**
* Instantiates a new hive session service.
*
* @param cliService the cli service
*/
public HiveSessionService(CLIService cliService) {
super(NAME, cliService);
}
@Override
public List<String> listAllResources(LensSessionHandle sessionHandle, String type) {
if (!isValidResouceType(type)) {
throw new BadRequestException("Bad resource type is passed. Please pass jar or file as source type");
}
List<ResourceEntry> resources = getSession(sessionHandle).getResources();
List<String> allResources = new ArrayList<String>();
for (ResourceEntry resource : resources) {
if (type == null || resource.getType().equalsIgnoreCase(type)) {
allResources.add(resource.toString());
}
}
return allResources;
}
private boolean isValidResouceType(String type) {
return (type == null || type.equalsIgnoreCase("jar") || type.equalsIgnoreCase("file"));
}
/**
* {@inheritDoc}
*/
@Override
public void addResource(LensSessionHandle sessionid, String type, String path) {
try {
acquire(sessionid);
SessionState ss = getSession(sessionid).getSessionState();
String finalLocation = ss.add_resource(SessionState.ResourceType.valueOf(type.toUpperCase()), path);
getSession(sessionid).addResource(type, path, finalLocation);
} catch (RuntimeException e) {
log.error("Failed to add resource type:" + type + " path:" + path + " in session", e);
throw new WebApplicationException(e);
} finally {
release(sessionid);
}
}
private void addResourceUponRestart(LensSessionHandle sessionid, ResourceEntry resourceEntry) {
try {
acquire(sessionid);
SessionState ss = getSession(sessionid).getSessionState();
resourceEntry.location = ss.add_resource(SessionState.ResourceType.valueOf(resourceEntry.getType()),
resourceEntry.getUri());
if (resourceEntry.location == null) {
throw new NullPointerException("Resource's final location cannot be null");
}
} finally {
release(sessionid);
}
}
/**
* {@inheritDoc}
*/
@Override
public void deleteResource(LensSessionHandle sessionid, String type, String path) {
String command = "delete " + type.toLowerCase() + " " + path;
try {
acquire(sessionid);
closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
getSession(sessionid).removeResource(type, path);
} catch (HiveSQLException e) {
throw new WebApplicationException(e);
} finally {
release(sessionid);
}
}
/**
* Gets the session param.
*
* @param sessionConf the session conf
* @param ss the ss
* @param varname the varname
* @return the session param
*/
private String getSessionParam(Configuration sessionConf, SessionState ss, String varname) {
if (varname.indexOf(SystemVariables.HIVEVAR_PREFIX) == 0) {
String var = varname.substring(SystemVariables.HIVEVAR_PREFIX.length());
if (ss.getHiveVariables().get(var) != null) {
return SystemVariables.HIVEVAR_PREFIX + var + "=" + ss.getHiveVariables().get(var);
} else {
throw new NotFoundException(varname + " is undefined as a hive variable");
}
} else {
String var;
if (varname.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
var = varname.substring(SystemVariables.HIVECONF_PREFIX.length());
} else {
var = varname;
}
if (sessionConf.get(var) != null) {
return varname + "=" + sessionConf.get(var);
} else {
throw new NotFoundException(varname + " is undefined");
}
}
}
/**
* {@inheritDoc}
*/
@Override
public LensSessionHandle openSession(String username, String password, String database,
Map<String, String> configuration)
throws LensException {
LensSessionHandle sessionid = super.openSession(username, password, configuration);
log.info("Opened session " + sessionid + " for user " + username);
notifyEvent(new SessionOpened(System.currentTimeMillis(), sessionid, username));
// Set current database
if (StringUtils.isNotBlank(database)) {
try {
if (!Hive.get(getSession(sessionid).getHiveConf()).databaseExists(database)) {
closeSession(sessionid);
log.info("Closed session " + sessionid.getPublicId().toString() + " as db " + database + " does not exist");
throw new NotFoundException("Database " + database + " does not exist");
}
} catch (Exception e) {
if (!(e instanceof NotFoundException)) {
try {
closeSession(sessionid);
} catch (LensException e2) {
log.error("Error closing session " + sessionid.getPublicId().toString(), e2);
}
log.error("Error in checking if database exists " + database, e);
throw new LensException("Error in checking if database exists" + database, e);
} else {
throw (NotFoundException) e;
}
}
getSession(sessionid).setCurrentDatabase(database);
log.info("Set database to " + database + " for session " + sessionid.getPublicId());
}
// add auxuiliary jars
String[] auxJars = getSession(sessionid).getSessionConf().getStrings(LensConfConstants.AUX_JARS);
if (auxJars != null) {
for (String jar : auxJars) {
log.info("Adding aux jar:" + jar);
addResource(sessionid, "jar", jar);
}
}
return sessionid;
}
@Override
public boolean isOpen(LensSessionHandle sessionHandle) {
return SESSION_MAP.containsKey(sessionHandle.getPublicId().toString());
}
/**
* {@inheritDoc}
*/
@Override
public List<String> getAllSessionParameters(LensSessionHandle sessionid, boolean verbose, String key)
throws LensException {
List<String> result = new ArrayList<String>();
acquire(sessionid);
try {
SessionState ss = getSession(sessionid).getSessionState();
if (!StringUtils.isBlank(key)) {
result.add(getSessionParam(getSession(sessionid).getSessionConf(), ss, key));
} else {
SortedMap<String, String> sortedMap = new TreeMap<String, String>();
sortedMap.put("silent", (ss.getIsSilent() ? "on" : "off"));
for (String s : ss.getHiveVariables().keySet()) {
sortedMap.put(SystemVariables.HIVEVAR_PREFIX + s, ss.getHiveVariables().get(s));
}
for (Map.Entry<String, String> entry : getSession(sessionid).getSessionConf()) {
sortedMap.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
result.add(entry.toString());
}
}
} finally {
release(sessionid);
}
return result;
}
/**
* {@inheritDoc}
*/
@Override
public void setSessionParameter(LensSessionHandle sessionid, String key, String value) {
HashMap<String, String> config = Maps.newHashMap();
config.put(key, value);
setSessionParameters(sessionid, config);
}
/**
* Sets the session parameter.
*
* @param sessionid the sessionid
* @param config map of string-string. each entry represents key and the value to be set for that key
*/
protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config) {
log.info("Request to Set params:" + config);
try {
acquire(sessionid);
// set in session conf
for(Map.Entry<String, String> entry: config.entrySet()) {
String var = entry.getKey();
if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
}
getSession(sessionid).getSessionConf().set(var, entry.getValue());
String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
}
// add to persist
getSession(sessionid).setConfig(config);
log.info("Set params:" + config);
} catch (HiveSQLException e) {
throw new WebApplicationException(e);
} finally {
release(sessionid);
}
}
private void setSessionParametersOnRestore(LensSessionHandle sessionid, Map<String, String> config) {
// set in session conf
for(Map.Entry<String, String> entry: config.entrySet()) {
String var = entry.getKey();
if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
}
getSession(sessionid).getSessionConf().set(var, entry.getValue());
getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
}
log.info("Set params on restart:" + config);
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#init()
*/
@Override
public synchronized void init(HiveConf hiveConf) {
this.databaseResourceService = new DatabaseResourceService(DatabaseResourceService.NAME);
addService(this.databaseResourceService);
this.conf = hiveConf;
super.init(hiveConf);
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#start()
*/
@Override
public synchronized void start() {
super.start();
sessionExpiryThread = Executors.newSingleThreadScheduledExecutor(runnable
-> new Thread(runnable, "Session-expiry-thread"));
int sessionExpiryInterval = getSessionExpiryInterval();
sessionExpiryThread.scheduleWithFixedDelay(sessionExpiryRunnable, sessionExpiryInterval,
sessionExpiryInterval, TimeUnit.SECONDS);
// Restore sessions if any
if (restorableSessions == null || restorableSessions.size() <= 0) {
log.info("No sessions to restore");
return;
}
for (LensSessionImpl.LensSessionPersistInfo persistInfo : restorableSessions) {
try {
LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword());
LensSessionImpl session = getSession(sessionHandle);
session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
session.setCurrentDatabase(persistInfo.getDatabase());
session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
// Add resources for restored sessions
for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
try {
addResourceUponRestart(sessionHandle, resourceEntry);
} catch (Exception e) {
log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
}
}
// Add config for restored sessions
try{
setSessionParametersOnRestore(sessionHandle, session.getConfig());
} catch (Exception e) {
log.error("Error setting parameters " + session.getConfig()
+ " for session: " + session, e);
}
log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
} catch (LensException e) {
throw new RuntimeException(e);
}
}
log.info("Session service restored " + restorableSessions.size() + " sessions");
}
private int getSessionExpiryInterval() {
return conf.getInt(LensConfConstants.SESSION_EXPIRY_SERVICE_INTERVAL_IN_SECS,
LensConfConstants.DEFAULT_SESSION_EXPIRY_SERVICE_INTERVAL_IN_SECS);
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#stop()
*/
@Override
public synchronized void stop() {
super.stop();
if (sessionExpiryThread != null) {
sessionExpiryThread.shutdownNow();
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.LensService#writeExternal(java.io.ObjectOutput)
*/
@Override
public void writeExternal(ObjectOutput out) throws IOException {
// Write out all the sessions
List<LensSessionImpl> sessions = new ArrayList<>();
for (LensSessionHandle sessionHandle : SESSION_MAP.values()) {
try {
sessions.add(getSession(sessionHandle));
} catch (ClientErrorException e) {
// warn for invalid/null session and continue.
log.warn("Cannot persist " + (sessionHandle != null ? sessionHandle.getPublicId() : "null ")
+ " session. {}", e);
}
}
out.writeInt(sessions.size());
for (LensSessionImpl session : sessions) {
session.getLensSessionPersistInfo().writeExternal(out);
}
log.info("Session service pesristed " + sessions.size() + " sessions out of " + SESSION_MAP.size());
}
/**
* {@inheritDoc}
*/
@Override
public HealthStatus getHealthStatus() {
return this.getServiceState().equals(STATE.STARTED)
? new HealthStatus(true, "Hive session service is healthy.")
: new HealthStatus(false, "Hive session service is down.");
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.LensService#readExternal(java.io.ObjectInput)
*/
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
int numSessions = in.readInt();
restorableSessions = new ArrayList<LensSessionImpl.LensSessionPersistInfo>();
for (int i = 0; i < numSessions; i++) {
LensSessionImpl.LensSessionPersistInfo persistInfo = new LensSessionImpl.LensSessionPersistInfo();
persistInfo.readExternal(in);
restorableSessions.add(persistInfo);
SESSION_MAP.put(persistInfo.getSessionHandle().getPublicId().toString(), persistInfo.getSessionHandle());
}
log.info("Session service recovered " + SESSION_MAP.size() + " sessions");
}
/**
* {@inheritDoc}
*/
@Override
public void closeSession(LensSessionHandle sessionHandle) throws LensException {
closeInternal(sessionHandle);
notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle));
}
@Override
public void cleanupIdleSessions() throws LensException {
ScheduledFuture<?> schedule = sessionExpiryThread.schedule(sessionExpiryRunnable, 0, TimeUnit.MILLISECONDS);
// wait till completion
try {
schedule.get();
} catch (InterruptedException | ExecutionException e) {
throw new LensException(e);
}
}
/**
* Close a Lens server session
* @param sessionHandle session handle
* @throws LensException
*/
private void closeInternal(LensSessionHandle sessionHandle) throws LensException {
super.closeSession(sessionHandle);
}
/**
* Close operation created for underlying CLI service
* @param op operation handle
*/
private void closeCliServiceOp(OperationHandle op) {
if (op != null) {
try {
getCliService().closeOperation(op);
} catch (HiveSQLException e) {
log.error("Error closing operation " + op.getHandleIdentifier(), e);
}
}
}
public Runnable getSessionExpiryRunnable() {
return sessionExpiryRunnable;
}
/**
* The Class SessionExpiryRunnable.
*/
public class SessionExpiryRunnable implements Runnable {
/**
* Run internal.
*/
public void runInternal() {
List<LensSessionHandle> sessionsToRemove = new ArrayList<>(SESSION_MAP.values());
Iterator<LensSessionHandle> itr = sessionsToRemove.iterator();
while (itr.hasNext()) {
LensSessionHandle sessionHandle = itr.next();
try {
LensSessionImpl session = getSession(sessionHandle);
if (session.isActive()) {
itr.remove();
}
} catch (ClientErrorException nfe) {
log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
itr.remove();
}
}
log.info("Sessions to remove : {} out of {} all sessions", sessionsToRemove.size(), SESSION_MAP.size());
// Now close all inactive sessions
for (LensSessionHandle sessionHandle : sessionsToRemove) {
try {
long lastAccessTime = getSession(sessionHandle).getLastAccessTime();
closeInternal(sessionHandle);
log.info("Closed inactive session " + sessionHandle.getPublicId() + " last accessed at "
+ new Date(lastAccessTime));
notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
} catch (ClientErrorException nfe) {
log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
// Do nothing
} catch (LensException e) {
log.error("Error closing session " + sessionHandle.getPublicId() + " reason " + e.getMessage(), e);
}
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
log.info("Running session expiry run");
runInternal();
} catch (Exception e) {
log.warn("Unknown error while checking for inactive sessions - ", e);
}
}
}
}