blob: 0d067dcfefec9b81f1b5801032a52acdf4ae4ae6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.session;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.common.ProtoObject;
import java.util.HashMap;
import java.util.Map;
import static org.apache.tajo.ResourceProtos.SessionProto;
public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
private static final Log LOG = LogFactory.getLog(Session.class);
private final String sessionId;
private final String userName;
private String currentDatabase;
private final Map<String, String> sessionVariables;
private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
private LoadingCache<String, Expr> cache;
// transient status
private volatile long lastAccessTime;
public Session(String sessionId, String userName, String databaseName) {
this.sessionId = sessionId;
this.userName = userName;
this.currentDatabase = databaseName;
this.lastAccessTime = System.currentTimeMillis();
this.sessionVariables = new HashMap<String, String>();
sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
selectDatabase(databaseName);
}
public Session(SessionProto proto) {
sessionId = proto.getSessionId();
userName = proto.getUsername();
currentDatabase = proto.getCurrentDatabase();
lastAccessTime = proto.getLastAccessTime();
KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables());
sessionVariables = keyValueSet.getAllKeyValus();
}
public String getSessionId() {
return sessionId;
}
public String getUserName() {
return userName;
}
public void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
}
public long getLastAccessTime() {
return lastAccessTime;
}
public void setVariable(String name, String value) {
synchronized (sessionVariables) {
sessionVariables.put(SessionVars.handleDeprecatedName(name), value);
}
}
public String getVariable(String name) throws NoSuchSessionVariableException {
synchronized (sessionVariables) {
if (sessionVariables.containsKey(name)) {
return sessionVariables.get(SessionVars.handleDeprecatedName(name));
} else {
throw new NoSuchSessionVariableException(name);
}
}
}
public void removeVariable(String name) {
synchronized (sessionVariables) {
sessionVariables.remove(SessionVars.handleDeprecatedName(name));
}
}
public synchronized Map<String, String> getAllVariables() {
synchronized (sessionVariables) {
sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime));
sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase);
return ImmutableMap.copyOf(sessionVariables);
}
}
public synchronized void selectDatabase(String databaseName) {
this.currentDatabase = databaseName;
}
public synchronized String getCurrentDatabase() {
return currentDatabase;
}
public synchronized void setQueryCache(LoadingCache<String, Expr> cache) {
this.cache = cache;
}
public synchronized LoadingCache<String, Expr> getQueryCache() {
return cache;
}
@Override
public SessionProto getProto() {
SessionProto.Builder builder = SessionProto.newBuilder();
builder.setSessionId(getSessionId());
builder.setUsername(getUserName());
builder.setCurrentDatabase(getCurrentDatabase());
builder.setLastAccessTime(lastAccessTime);
KeyValueSet variables = new KeyValueSet();
synchronized (sessionVariables) {
variables.putAll(this.sessionVariables);
builder.setVariables(variables.getProto());
return builder.build();
}
}
public String toString() {
return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
}
public Session clone() throws CloneNotSupportedException {
Session newSession = (Session) super.clone();
newSession.sessionVariables.putAll(getAllVariables());
return newSession;
}
public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
synchronized (nonForwardQueryMap) {
return nonForwardQueryMap.get(queryId);
}
}
public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
synchronized (nonForwardQueryMap) {
nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
}
}
public void closeNonForwardQueryResultScanner(QueryId queryId) {
NonForwardQueryResultScanner resultScanner;
synchronized (nonForwardQueryMap) {
resultScanner = nonForwardQueryMap.remove(queryId);
}
if (resultScanner != null) {
try {
resultScanner.close();
} catch (Exception e) {
LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
}
}
}
public void close() {
try {
synchronized (nonForwardQueryMap) {
for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
try {
eachQueryScanner.close();
} catch (Exception e) {
LOG.error("Error while closing NonForwardQueryResultScanner: " +
eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
}
}
nonForwardQueryMap.clear();
}
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw new RuntimeException(t.getMessage(), t);
}
}
}