blob: 7d81375c715979eb163a2476732b9aaa4e7f77fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server.session;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.cube.metadata.CubeMetastoreClient;
import org.apache.lens.server.LensServices;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.session.SessionService;
import org.apache.lens.server.util.UtilityMethods;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.session.HiveSessionImpl;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* The Class LensSessionImpl.
*/
@Slf4j
public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
/** The persist info. */
private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo();
/** The session timeout. */
private long sessionTimeout;
private static class IntegerThreadLocal extends ThreadLocal<Integer> {
@Override
protected Integer initialValue() {
return 0;
}
public Integer incrementAndGet() {
set(get() + 1);
return get();
}
public Integer decrementAndGet() {
set(get() - 1);
return get();
}
}
private IntegerThreadLocal acquireCount = new IntegerThreadLocal();
/** The conf. */
private Configuration conf = createDefaultConf();
/**
* List of queries which are submitted in this session.
*/
@Getter
private final List<QueryHandle> activeQueries = new ArrayList<>();
/**
* Keep track of DB static resources which failed to be added to this session
*/
private final Map<String, List<ResourceEntry>> failedDBResources = new HashMap<>();
/**
* Cache of database specific class loaders for this session
* This is updated lazily on add/remove resource calls and switch database calls.
*/
private final Map<String, SessionClassLoader> sessionDbClassLoaders = new HashMap<>();
@Setter(AccessLevel.PROTECTED)
private DatabaseResourceService dbResService;
/**
* Inits the persist info.
* @param sessionConf the session conf
*/
private void initPersistInfo(Map<String, String> sessionConf) {
persistInfo.setSessionHandle(new LensSessionHandle(getSessionHandle().getHandleIdentifier().getPublicId(),
getSessionHandle().getHandleIdentifier().getSecretId()));
persistInfo.setUsername(getUserName());
persistInfo.setPassword(getPassword());
persistInfo.setLastAccessTime(System.currentTimeMillis());
persistInfo.setSessionConf(sessionConf);
persistInfo.setProxyUser(sessionConf.get(LensConfConstants.SESSION_PROXY_USER));
if (sessionConf != null) {
for (Map.Entry<String, String> entry : sessionConf.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
}
private static Configuration sessionDefaultConfig;
/**
* Creates the default conf.
*
* @return the configuration
*/
public static synchronized Configuration createDefaultConf() {
if (sessionDefaultConfig == null) {
Configuration conf = new Configuration(false);
conf.addResource("lenssession-default.xml");
conf.addResource("lens-site.xml");
sessionDefaultConfig = new Configuration(false);
for (Map.Entry<String, String> prop : conf) {
if (!prop.getKey().startsWith(LensConfConstants.SERVER_PFX)) {
sessionDefaultConfig.set(prop.getKey(), prop.getValue());
}
}
}
//Not exposing sessionDefaultConfig directly to insulate it form modifications
return new Configuration(sessionDefaultConfig);
}
/** The default hive session conf. */
public static final Map<String, String> DEFAULT_HIVE_SESSION_CONF = getHiveSessionConf();
public static Map<String, String> getHiveSessionConf() {
Configuration defaultConf = createDefaultConf();
return defaultConf.getValByRegex("hive.*");
}
/**
* Instantiates a new lens session impl.
*
* @param protocol the protocol
* @param username the username
* @param password the password
* @param serverConf the server conf
* @param ipAddress the ip address
*/
public LensSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverConf,
String ipAddress) {
super(protocol, username, password, serverConf, ipAddress);
sessionTimeout = 1000 * serverConf.getLong(LensConfConstants.SESSION_TIMEOUT_SECONDS,
LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT);
}
public Configuration getSessionConf() {
return conf;
}
/**
* Constructor used when restoring session.
*
* @param sessionHandle the session handle
* @param protocol the protocol
* @param username the username
* @param password the password
* @param serverConf the server conf
* @param ipAddress the ip address
*/
public LensSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password,
HiveConf serverConf, String ipAddress) {
super(sessionHandle, protocol, username, password, serverConf, ipAddress);
sessionTimeout = 1000 * serverConf.getLong(LensConfConstants.SESSION_TIMEOUT_SECONDS,
LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT);
}
@Override
public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
super.open(sessionConfMap);
initPersistInfo(sessionConfMap);
}
@Override
public void close() throws HiveSQLException {
ClassLoader nonDBClassLoader = getSessionState().getConf().getClassLoader();
super.close();
// Release class loader resources
JavaUtils.closeClassLoadersTo(nonDBClassLoader, getClass().getClassLoader());
synchronized (sessionDbClassLoaders) {
for (Map.Entry<String, SessionClassLoader> entry : sessionDbClassLoaders.entrySet()) {
try {
// Closing session level classloaders up untill the db class loader if present, or null.
// When db class loader is null, the class loader in the session is a single class loader
// which stays as it is on database switch -- provided the new db doesn't have db jars.
// The following line will close class loaders made on top of db class loaders and will close
// only one classloader without closing the parents. In case of no db class loader, the session
// classloader will already have been closed by either super.close() or before this for loop.
JavaUtils.closeClassLoadersTo(entry.getValue(), getDbResService().getClassLoader(entry.getKey()));
} catch (Exception e) {
log.error("Error closing session classloader for session: {}", getSessionHandle().getSessionId(), e);
}
}
sessionDbClassLoaders.clear();
}
// reset classloader in close
Thread.currentThread().setContextClassLoader(LensSessionImpl.class.getClassLoader());
}
public CubeMetastoreClient getCubeMetastoreClient() throws LensException {
try {
CubeMetastoreClient cubeClient = CubeMetastoreClient.getInstance(getHiveConf());
// since cube client's configuration is a copy of the session conf passed, setting classloader in cube client's
// configuration does not modify session conf's classloader.
// We are doing this on the cubeClient instance than doing a copy of conf and setting classloader and pass it to
// cube metastore client because CubeMetastoreClient would have been cached and refreshing the classloader
// should not result in invalidating the CubeMetastoreClient cache.
cubeClient.getConf().setClassLoader(getClassLoader());
return cubeClient;
} catch (HiveException e) {
throw new LensException(e);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.cli.session.HiveSessionImpl#acquire()
*/
public void acquire() {
this.acquire(true);
}
@Override
public void acquire(boolean userAccess) {
super.acquire(userAccess);
if (acquireCount.incrementAndGet() == 1) { // first acquire
// Update thread's class loader with current DBs class loader
ClassLoader classLoader = getClassLoader(getCurrentDatabase());
Thread.currentThread().setContextClassLoader(classLoader);
SessionState.getSessionConf().setClassLoader(classLoader);
}
setActive();
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.cli.session.HiveSessionImpl#release()
*/
public void release() {
this.release(true);
}
@Override
public synchronized void release(boolean userAccess) {
setActive();
if (acquireCount.decrementAndGet() == 0) {
super.release(userAccess);
// reset classloader in release
Thread.currentThread().setContextClassLoader(LensSessionImpl.class.getClassLoader());
}
}
public boolean isActive() {
// session is active, if any active operations are present.
// If no active operations are present, session is active if timeout is not reached and session is not
// marked for close
return activeOperationsPresent() || ((System.currentTimeMillis() - persistInfo.lastAccessTime < sessionTimeout)
&& !persistInfo.markedForClose);
}
public boolean isMarkedForClose() {
return persistInfo.isMarkedForClose();
}
public synchronized void setActive() {
setLastAccessTime(System.currentTimeMillis());
}
/**
* Sets the config.
*
* @param config the config to overlay
*/
public void setConfig(Map<String, String> config) {
persistInfo.getConfig().putAll(config);
}
/**
* Removes the resource.
*
* @param type the type
* @param path the path
*/
public void removeResource(String type, String path) {
Iterator<ResourceEntry> itr = persistInfo.getResources().iterator();
while (itr.hasNext()) {
ResourceEntry res = itr.next();
if (res.getType().equalsIgnoreCase(type) && res.getUri().equals(path)) {
itr.remove();
}
}
// New classloaders will be created. Remove resource is expensive, add resource is cheap.
updateAllSessionClassLoaders();
}
/**
* Adds the resource.
*
* @param type the type
* @param path the path
* @param finalLocation The final location where resources is downloaded
*/
public void addResource(String type, String path, String finalLocation) {
ResourceEntry resource = new ResourceEntry(type, path, finalLocation);
persistInfo.getResources().add(resource);
// The following call updates the existing classloaders without creating new instances.
// Add resource is cheap :)
addResourceToAllSessionClassLoaders(resource);
}
protected List<ResourceEntry> getResources() {
return persistInfo.getResources();
}
protected Map<String, String> getConfig() {
return persistInfo.getConfig();
}
public void setCurrentDatabase(String currentDatabase) {
persistInfo.setDatabase(currentDatabase);
getSessionState().setCurrentDatabase(currentDatabase);
// Make sure entry is there in classloader cache
synchronized (sessionDbClassLoaders) {
updateSessionDbClassLoader(currentDatabase);
}
}
private SessionClassLoader getUpdatedSessionClassLoader(String database) {
ClassLoader dbClassLoader = getDbResService().getClassLoader(database);
if (dbClassLoader == null) {
return null;
}
URL[] urls = new URL[0];
if (persistInfo.getResources() != null) {
int i = 0;
urls = new URL[persistInfo.getResources().size()];
for (LensSessionImpl.ResourceEntry res : persistInfo.getResources()) {
try {
urls[i++] = new URL(res.getUri());
} catch (MalformedURLException e) {
log.error("Invalid URL {} with location: {} adding to db {}", res.getUri(), res.getLocation(), database, e);
}
}
}
if (sessionDbClassLoaders.containsKey(database)
&& Arrays.equals(sessionDbClassLoaders.get(database).getURLs(), urls)) {
return sessionDbClassLoaders.get(database);
}
return new SessionClassLoader(urls, dbClassLoader);
}
private void updateSessionDbClassLoader(String database) {
SessionClassLoader updatedClassLoader = getUpdatedSessionClassLoader(database);
if (updatedClassLoader != null) {
sessionDbClassLoaders.put(database, updatedClassLoader);
}
}
private void updateAllSessionClassLoaders() {
synchronized (sessionDbClassLoaders) {
// Update all DB class loaders
for (String database: sessionDbClassLoaders.keySet()) {
updateSessionDbClassLoader(database);
}
}
}
private void addResourceToClassLoader(String database, ResourceEntry res) {
if (sessionDbClassLoaders.containsKey(database)) {
SessionClassLoader sessionClassLoader = sessionDbClassLoaders.get(database);
try {
sessionClassLoader.addURL(new URL(res.getLocation()));
} catch (MalformedURLException e) {
log.error("Invalid URL {} with location: {} adding to db {}", res.getUri(), res.getLocation(), database, e);
}
}
}
private void addResourceToAllSessionClassLoaders(ResourceEntry res) {
synchronized (sessionDbClassLoaders) {
// Update all DB class loaders
for (String database: sessionDbClassLoaders.keySet()) {
addResourceToClassLoader(database, res);
}
}
}
private boolean areResourcesAdded() {
return persistInfo.getResources() != null && !persistInfo.getResources().isEmpty();
}
private DatabaseResourceService getDbResService() {
if (dbResService == null) {
HiveSessionService sessionService = LensServices.get().getService(SessionService.NAME);
return sessionService.getDatabaseResourceService();
} else {
return dbResService;
}
}
protected ClassLoader getClassLoader(String database) {
synchronized (sessionDbClassLoaders) {
if (sessionDbClassLoaders.containsKey(database)) {
return sessionDbClassLoaders.get(database);
} else {
ClassLoader classLoader = getDbResService().getClassLoader(database);
if (classLoader == null) {
log.debug("DB resource service gave null class loader for {}", database);
} else {
if (areResourcesAdded()) {
log.debug("adding resources for {}", database);
// We need to update DB specific classloader with added resources
updateSessionDbClassLoader(database);
classLoader = sessionDbClassLoaders.get(database);
}
}
return classLoader == null ? getSessionState().getConf().getClassLoader() : classLoader;
}
}
}
public String getCurrentDatabase() {
return getSessionState().getCurrentDatabase();
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return getSessionHandle().getHandleIdentifier().toString();
}
public String getLoggedInUser() {
return getHiveConf().get(LensConfConstants.SESSION_LOGGEDIN_USER);
}
public String getClusterUser() {
return getUserName();
}
public LensSessionPersistInfo getLensSessionPersistInfo() {
return persistInfo;
}
public void setLastAccessTime(long lastAccessTime) {
persistInfo.lastAccessTime = lastAccessTime;
}
public long getLastAccessTime() {
return persistInfo.lastAccessTime;
}
/**
* Return resources which are added statically to the database
* @return db resources
*/
public Collection<ResourceEntry> getDBResources(String database) {
synchronized (failedDBResources) {
List<ResourceEntry> failed = failedDBResources.get(database);
if (failed == null && getDbResService().getResourcesForDatabase(database) != null) {
failed = new ArrayList<>(getDbResService().getResourcesForDatabase(database));
failedDBResources.put(database, failed);
}
return failed;
}
}
/**
* Get session's resources which have to be added for the given database
*/
public Collection<ResourceEntry> getPendingSessionResourcesForDatabase(String database) {
List<ResourceEntry> pendingResources = new ArrayList<>();
for (ResourceEntry res : persistInfo.getResources()) {
if (!res.isAddedToDatabase(database)) {
pendingResources.add(res);
}
}
return pendingResources;
}
/**
* @return effective class loader for this session
*/
public ClassLoader getClassLoader() {
return getClassLoader(getCurrentDatabase());
}
public void markForClose() {
log.info("Marking session {} for close. Operations on this session will be rejected", this);
persistInfo.markedForClose = true;
}
/**
* The Class ResourceEntry.
*/
public static class ResourceEntry {
/** The type. */
@Getter
final String type;
@Getter
final String uri;
/** The final location. */
@Getter
String location;
// For tests
/** The restore count. */
transient AtomicInteger restoreCount = new AtomicInteger();
/** Set of databases for which this resource has been added */
final transient Set<String> databases = new HashSet<>();
/**
* Instantiates a new resource entry.
*
* @param type the type
* @param uri the uri of resource
*/
public ResourceEntry(String type, String uri) {
this(type, uri, uri);
}
public ResourceEntry(String type, String uri, String location) {
if (type == null || uri == null || location == null) {
throw new NullPointerException("ResourceEntry type or uri or location cannot be null");
}
this.type = type.toUpperCase();
this.uri = uri;
this.location = location;
}
public boolean isAddedToDatabase(String database) {
return databases.contains(database);
}
public void addToDatabase(String database) {
databases.add(database);
}
/**
* Restored resource.
*/
public void restoredResource() {
restoreCount.incrementAndGet();
}
/**
* @return the value of restoreCount for the resource
*/
public int getRestoreCount() {
return restoreCount.get();
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "type=" + type + " path=" + location;
}
@Override
public int hashCode() {
return type.hashCode() + 31 * location.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceEntry) {
ResourceEntry other = (ResourceEntry) obj;
return type.equals(other.type) && location.equals(other.location);
}
return false;
}
}
/**
* The Class LensSessionPersistInfo.
*/
@Data
public static class LensSessionPersistInfo implements Externalizable {
/** The resources. */
private List<ResourceEntry> resources = new ArrayList<>();
/** The config. */
private Map<String, String> config = new HashMap<>();
/** The session handle. */
private LensSessionHandle sessionHandle;
/** The database. */
private String database;
/** The username. */
private String username;
/** The password. */
private String password;
/** The last access time. */
private long lastAccessTime;
/** Whether it's marked for close */
private boolean markedForClose;
/** The proxy user which is initiating the request. This could be null */
private String proxyUser;
public void setSessionConf(Map<String, String> sessionConf) {
UtilityMethods.mergeMaps(config, sessionConf, true);
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
*/
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(sessionHandle.toString());
out.writeUTF(database == null ? "default" : database);
out.writeUTF(username == null ? "" : username);
out.writeUTF(password == null ? "" : password);
out.writeInt(resources.size());
for (ResourceEntry resource : resources) {
out.writeUTF(resource.getType());
out.writeUTF(resource.getUri());
}
out.writeInt(config.size());
for (String key : config.keySet()) {
out.writeUTF(key);
out.writeUTF(config.get(key));
}
out.writeLong(lastAccessTime);
out.writeBoolean(markedForClose);
out.writeUTF(proxyUser == null ? "" : proxyUser);
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#readExternal(java.io.ObjectInput)
*/
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
sessionHandle = LensSessionHandle.valueOf(in.readUTF());
database = in.readUTF();
username = in.readUTF();
password = in.readUTF();
int resSize = in.readInt();
resources.clear();
for (int i = 0; i < resSize; i++) {
String type = in.readUTF();
String uri = in.readUTF();
resources.add(new ResourceEntry(type, uri));
}
config.clear();
int cfgSize = in.readInt();
for (int i = 0; i < cfgSize; i++) {
String key = in.readUTF();
String val = in.readUTF();
config.put(key, val);
}
lastAccessTime = in.readLong();
markedForClose = in.readBoolean();
proxyUser = in.readUTF();
}
}
public void addToActiveQueries(QueryHandle queryHandle) {
log.info("Adding {} to active queries for session {}", queryHandle, this);
synchronized (this.activeQueries) {
activeQueries.add(queryHandle);
}
}
public void removeFromActiveQueries(QueryHandle queryHandle) {
log.info("Removing {} from active queries for session {}", queryHandle, this);
synchronized (this.activeQueries) {
activeQueries.remove(queryHandle);
}
}
public boolean activeOperationsPresent() {
synchronized (this.activeQueries) {
return !activeQueries.isEmpty();
}
}
}