blob: 647a4729bde019d13eb3cdbca750b6833915dbec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.protocol.session;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.audit.AuditLogger;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
public class SessionManager implements SessionManagerMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
// When the client abnormally exits, we can still know who to disconnect
/** currSession can be only used in client-thread model services. */
private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
private final ThreadLocal<Long> currSessionIdleTime = new ThreadLocal<>();
// sessions does not contain MqttSessions..
private final Map<IClientSession, Object> sessions = new ConcurrentHashMap<>();
// used for sessions.
private final Object placeHolder = new Object();
private final AtomicLong sessionIdGenerator = new AtomicLong();
// The statementId is unique in one IoTDB instance.
private final AtomicLong statementIdGenerator = new AtomicLong();
private static final AuthorStatement AUTHOR_STATEMENT = new AuthorStatement(StatementType.AUTHOR);
public static final TSProtocolVersion CURRENT_RPC_VERSION =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
private static final boolean ENABLE_AUDIT_LOG =
IoTDBDescriptor.getInstance().getConfig().isEnableAuditLog();
protected SessionManager() {
// singleton
String mbeanName =
String.format(
"%s:%s=%s",
IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
IoTDBConstant.JMX_TYPE,
ServiceType.SESSION_MANAGER.getJmxName());
JMXService.registerMBean(this, mbeanName);
}
public BasicOpenSessionResp login(
IClientSession session,
String username,
String password,
String zoneId,
TSProtocolVersion tsProtocolVersion,
IoTDBConstant.ClientVersion clientVersion) {
TSStatus loginStatus;
BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
loginStatus = AuthorityChecker.checkUser(username, password);
if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// check the version compatibility
if (!tsProtocolVersion.equals(CURRENT_RPC_VERSION)) {
openSessionResp
.sessionId(-1)
.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode())
.setMessage("The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
} else {
supplySession(session, username, ZoneId.of(zoneId), clientVersion);
openSessionResp
.sessionId(session.getId())
.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())
.setMessage("Login successfully");
LOGGER.info(
"{}: Login status: {}. User : {}, opens Session-{}",
IoTDBConstant.GLOBAL_DB_NAME,
openSessionResp.getMessage(),
username,
session);
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(
String.format(
"%s: Login status: %s. User : %s, opens Session-%s",
IoTDBConstant.GLOBAL_DB_NAME, openSessionResp.getMessage(), username, session),
AUTHOR_STATEMENT);
}
}
} else {
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(
String.format("User %s opens Session failed with an incorrect password", username),
AUTHOR_STATEMENT);
}
openSessionResp.sessionId(-1).setMessage(loginStatus.message).setCode(loginStatus.code);
}
return openSessionResp;
}
public boolean closeSession(IClientSession session, LongConsumer releaseByQueryId) {
releaseSessionResource(session, releaseByQueryId);
MetricService.getInstance()
.remove(
MetricType.HISTOGRAM,
Metric.SESSION_IDLE_TIME.toString(),
Tag.NAME.toString(),
String.valueOf(session.getId()));
// TODO we only need to do so when query is killed by time out close the socket.
IClientSession session1 = currSession.get();
if (session1 != null && session != session1) {
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(
String.format(
"The client-%s is trying to close another session %s, pls check if it's a bug",
session, session1),
AUTHOR_STATEMENT);
}
return false;
} else {
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("Session-%s is closing", session), AUTHOR_STATEMENT);
}
return true;
}
}
private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) {
Iterable<Long> statementIds = session.getStatementIds();
if (statementIds != null) {
for (Long statementId : statementIds) {
Set<Long> queryIdSet = session.removeStatementId(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
releaseQueryResource.accept(queryId);
}
}
}
}
}
public TSStatus closeOperation(
IClientSession session,
long queryId,
long statementId,
boolean haveStatementId,
boolean haveSetQueryId,
LongConsumer releaseByQueryId) {
if (!checkLogin(session)) {
return RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN,
"Log in failed. Either you are not authorized or the session has timed out.");
}
try {
if (haveStatementId) {
if (haveSetQueryId) {
this.closeDataset(session, statementId, queryId, releaseByQueryId);
} else {
this.closeStatement(session, statementId, releaseByQueryId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
return RpcUtils.getStatus(
TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
}
} catch (Exception e) {
return onNpeOrUnexpectedException(
e, OperationType.CLOSE_OPERATION, TSStatusCode.CLOSE_OPERATION_ERROR);
}
}
/**
* Check whether current user has logged in.
*
* @return true: If logged in; false: If not logged in
*/
public boolean checkLogin(IClientSession session) {
boolean isLoggedIn = session != null && session.isLogin();
if (!isLoggedIn) {
LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
}
return isLoggedIn;
}
public long requestStatementId(IClientSession session) {
long statementId = statementIdGenerator.incrementAndGet();
session.addStatementId(statementId);
return statementId;
}
public void closeStatement(
IClientSession session, long statementId, LongConsumer releaseByQueryId) {
Set<Long> queryIdSet = session.removeStatementId(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
releaseByQueryId.accept(queryId);
}
}
session.removeStatementId(statementId);
}
public long requestQueryId(IClientSession session, Long statementId) {
long queryId = requestQueryId();
session.addQueryId(statementId, queryId);
return queryId;
}
public long requestQueryId() {
return QueryResourceManager.getInstance().assignQueryId();
}
/** this method can be only used in client-thread model. */
public IClientSession getCurrSession() {
return currSession.get();
}
/** get current session and update session idle time. */
public IClientSession getCurrSessionAndUpdateIdleTime() {
IClientSession clientSession = getCurrSession();
Long idleTime = currSessionIdleTime.get();
if (idleTime == null) {
currSessionIdleTime.set(System.nanoTime());
} else {
MetricService.getInstance()
.getOrCreateHistogram(
Metric.SESSION_IDLE_TIME.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
String.valueOf(clientSession.getId()))
.update(System.nanoTime() - idleTime);
}
return clientSession;
}
/** update connection idle time after execution. */
public void updateIdleTime() {
currSessionIdleTime.set(System.nanoTime());
}
public TimeZone getSessionTimeZone() {
IClientSession session = currSession.get();
if (session != null) {
return session.getTimeZone();
} else {
// only used for test
return TimeZone.getTimeZone(ZoneId.systemDefault());
}
}
/**
* this method can be only used in client-thread model. But, in message-thread model based
* service, calling this method has no side effect. <br>
* MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
* method in thrift's event handler.
*/
public void removeCurrSession() {
IClientSession session = currSession.get();
sessions.remove(session);
currSession.remove();
currSessionIdleTime.remove();
}
/**
* this method can be only used in client-thread model. Do not use this method in message-thread
* model based service.
*
* @return false if the session has been initialized.
*/
public boolean registerSession(IClientSession session) {
if (this.currSession.get() != null) {
LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
return false;
}
this.currSession.set(session);
this.currSessionIdleTime.set(System.nanoTime());
sessions.put(session, placeHolder);
return true;
}
/** must be called after registerSession()) will mark the session login. */
public void supplySession(
IClientSession session,
String username,
ZoneId zoneId,
IoTDBConstant.ClientVersion clientVersion) {
session.setId(sessionIdGenerator.incrementAndGet());
session.setUsername(username);
session.setZoneId(zoneId);
session.setClientVersion(clientVersion);
session.setLogin(true);
session.setLogInTime(System.currentTimeMillis());
}
public void closeDataset(
IClientSession session, Long statementId, Long queryId, LongConsumer releaseByQueryId) {
releaseByQueryId.accept(queryId);
session.removeQueryId(statementId, queryId);
}
public static SessionManager getInstance() {
return SessionManagerHelper.INSTANCE;
}
public SessionInfo getSessionInfo(IClientSession session) {
return new SessionInfo(
session.getId(), session.getUsername(), session.getZoneId(), session.getClientVersion());
}
@Override
public Set<String> getAllRpcClients() {
return this.sessions.keySet().stream()
.map(IClientSession::toString)
.collect(Collectors.toSet());
}
public TSConnectionInfoResp getAllConnectionInfo() {
return new TSConnectionInfoResp(
sessions.keySet().stream()
.filter(s -> StringUtils.isNotEmpty(s.getUsername()))
.map(IClientSession::convertToTSConnectionInfo)
.sorted(Comparator.comparingLong(TSConnectionInfo::getLogInTime))
.collect(Collectors.toList()));
}
private static class SessionManagerHelper {
private static final SessionManager INSTANCE = new SessionManager();
private SessionManagerHelper() {
// empty constructor
}
}
}