blob: dc3a0857fdbf85db16279e9cd078ea61b3060770 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Operation;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private final IPartitionFetcher PARTITION_FETCHER;
private final ISchemaFetcher SCHEMA_FETCHER;
public ClientRPCServiceImpl() {
if (config.isClusterMode()) {
PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
} else {
PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
}
}
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
TSStatus loginStatus;
try {
loginStatus = AuthorizerManager.getInstance().checkUser(req.username, req.password);
} catch (ConfigNodeConnectionException e) {
TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR, e.getMessage());
return new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
}
BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
long sessionId = -1;
if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// check the version compatibility
boolean compatible = req.client_protocol.equals(SessionManager.CURRENT_RPC_VERSION);
if (!compatible) {
openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
openSessionResp.setMessage(
"The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
openSessionResp = openSessionResp.sessionId(sessionId);
} else {
openSessionResp.setCode(loginStatus.getCode());
openSessionResp.setMessage(loginStatus.getMessage());
sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
LOGGER.info(
"{}: Login status: {}. User : {}, opens Session-{}",
IoTDBConstant.GLOBAL_DB_NAME,
openSessionResp.getMessage(),
req.username,
sessionId);
SessionTimeoutManager.getInstance().register(sessionId);
openSessionResp = openSessionResp.sessionId(sessionId);
}
} else {
openSessionResp.setMessage(loginStatus.getMessage());
openSessionResp.setCode(loginStatus.getCode());
sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
SessionManager.AUDIT_LOGGER.info(
"User {} opens Session failed with an incorrect password", req.username);
SessionTimeoutManager.getInstance().register(sessionId);
openSessionResp = openSessionResp.sessionId(sessionId);
}
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
}
private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
Map<String, String> configuration = req.configuration;
if (configuration != null && configuration.containsKey("version")) {
return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
}
return IoTDBConstant.ClientVersion.V_0_12;
}
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
SESSION_MANAGER.releaseSessionResource(req.sessionId, this::cleanupQueryExecution);
return new TSStatus(
!SESSION_MANAGER.closeSession(req.sessionId)
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@Override
public TSStatus cancelOperation(TSCancelOperationReq req) {
// TODO implement
return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
}
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
return SESSION_MANAGER.closeOperation(
req.sessionId,
req.queryId,
req.statementId,
req.isSetStatementId(),
req.isSetQueryId(),
this::cleanupQueryExecution);
}
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
} catch (Exception e) {
return new TSGetTimeZoneResp(
onNPEOrUnexpectedException(
e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
"Unknown time zone");
}
}
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
}
}
@Override
public ServerProperties getProperties() {
ServerProperties properties = new ServerProperties();
properties.setVersion(IoTDBConstant.VERSION);
properties.setBuildInfo(IoTDBConstant.BUILD_INFO);
LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION_WITH_BUILD);
properties.setSupportedTimeAggregationOperations(new ArrayList<>());
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
properties.setTimestampPrecision(
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
properties.setMaxConcurrentClientNum(
IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
properties.setWatermarkSecretKey(
IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
properties.setWatermarkBitString(
IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
properties.setWatermarkParamMarkRate(
IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
properties.setWatermarkParamMaxRightBit(
IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
properties.setIsReadOnly(CommonDescriptor.getInstance().getConfig().isReadOnly());
properties.setThriftMaxFrameSize(
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
return properties;
}
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
try {
if (!SESSION_MANAGER.checkLogin(sessionId)) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create storage group {}", SESSION_MANAGER.getCurrSessionId(), storageGroup);
}
// Step 1: Create SetStorageGroupStatement
SetStorageGroupStatement statement =
(SetStorageGroupStatement) StatementGenerator.createStatement(storageGroup);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
}
// measurementAlias is also a nodeName
PathUtils.isLegalSingleMeasurements(Collections.singletonList(req.getMeasurementAlias()));
// Step 1: transfer from TSCreateTimeseriesReq to Statement
CreateTimeSeriesStatement statement =
(CreateTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
SESSION_MANAGER.getCurrSessionId(),
req.getPrefixPath(),
req.getMeasurements());
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurements(req.getMeasurementAlias());
PathUtils.isLegalSingleMeasurements(req.getMeasurements());
// Step 1: transfer from CreateAlignedTimeSeriesReq to Statement
CreateAlignedTimeSeriesStatement statement =
(CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
SESSION_MANAGER.getCurrSessionId(),
req.getPaths().size(),
req.getPaths().get(0));
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurements(req.getMeasurementAliasList());
// Step 1: transfer from CreateMultiTimeSeriesReq to Statement
CreateMultiTimeSeriesStatement statement =
(CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus deleteTimeseries(long sessionId, List<String> path) {
throw new UnsupportedOperationException();
}
@Override
public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
try {
if (!SESSION_MANAGER.checkLogin(sessionId)) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} delete {} storage groups, the first is {}",
SESSION_MANAGER.getCurrSessionId(),
storageGroups.size(),
storageGroups.get(0));
}
// Step 1: transfer from DeleteStorageGroupsReq to Statement
DeleteStorageGroupStatement statement =
(DeleteStorageGroupStatement) StatementGenerator.createStatement(storageGroups);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
throw new UnsupportedOperationException();
}
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
String statement = req.getStatement();
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
Statement s =
StatementGenerator.createStatement(
statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
if (s == null) {
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
statement,
PARTITION_FETCHER,
SCHEMA_FETCHER,
req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(result.status);
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
TSExecuteStatementResp resp;
if (queryExecution != null && queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
resp.setQueryDataSet(
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
long costTime = System.currentTimeMillis() - startTime;
if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
}
}
@Override
public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
List<TSStatus> results = new ArrayList<>();
boolean isAllSuccessful = true;
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
for (int i = 0; i < req.getStatements().size(); i++) {
String statement = req.getStatements().get(i);
try {
Statement s =
StatementGenerator.createStatement(
statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
if (s == null) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported");
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, s);
long queryId = SESSION_MANAGER.requestQueryId(false);
long t2 = System.currentTimeMillis();
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
statement,
PARTITION_FETCHER,
SCHEMA_FETCHER,
config.getQueryTimeoutThreshold());
addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
results.add(result.status);
} catch (Exception e) {
LOGGER.error("Error occurred when executing executeBatchStatement: ", e);
TSStatus status =
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT);
if (status.getCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
isAllSuccessful = false;
}
results.add(status);
}
}
addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
return isAllSuccessful
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully")
: RpcUtils.getStatus(results);
}
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
return executeStatement(req);
}
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
throws TException {
return executeStatement(req);
}
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
TSQueryDataSet result =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
boolean hasResultSet = result.bufferForTime().limit() != 0;
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
if (!hasResultSet) {
COORDINATOR.removeQueryExecution(req.queryId);
}
return resp;
}
} catch (Exception e) {
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
}
}
@Override
public TSStatus insertRecords(TSInsertRecordsReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
SESSION_MANAGER.getCurrSessionId(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurementLists(req.getMeasurementsList());
// Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
SESSION_MANAGER.getCurrSessionId(),
req.prefixPath,
req.getTimestamps().get(0));
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurementLists(req.getMeasurementsList());
// Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
InsertRowsOfOneDeviceStatement statement =
(InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
SESSION_MANAGER.getCurrSessionId(),
req.prefixPath,
req.getTimestamps().get(0));
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurementLists(req.getMeasurementsList());
// Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
InsertRowsOfOneDeviceStatement statement =
(InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e,
OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertRecord(TSInsertRecordReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
SESSION_MANAGER.getCurrSessionId(),
req.getPrefixPath(),
req.getTimestamp());
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurements(req.getMeasurements());
InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
PathUtils.isLegalSingleMeasurementLists(req.getMeasurementsList());
// Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
InsertMultiTabletsStatement statement =
(InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertTablet(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurements(req.getMeasurements());
// Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
InsertTabletStatement statement =
(InsertTabletStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
SESSION_MANAGER.getCurrSessionId(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurementLists(req.getMeasurementsList());
InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_STRING_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
@Override
public TSStatus testInsertTablet(TSInsertTabletReq req) {
LOGGER.debug("Test insert batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertTablets(TSInsertTabletsReq req) {
LOGGER.debug("Test insert batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertRecord(TSInsertRecordReq req) {
LOGGER.debug("Test insert row request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
LOGGER.debug("Test insert string record request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertRecords(TSInsertRecordsReq req) {
LOGGER.debug("Test insert row in batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
LOGGER.debug("Test insert rows in batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
LOGGER.debug("Test insert string records request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
DeleteDataStatement statement = StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
Statement s =
StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("error code: " + result.status);
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
TSExecuteStatementResp resp;
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
resp.setQueryDataSet(
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
long costTime = System.currentTimeMillis() - startTime;
if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
}
}
}
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
Statement s =
StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("error code: " + result.status);
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
TSExecuteStatementResp resp;
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
resp.setQueryDataSet(
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
long costTime = System.currentTimeMillis() - startTime;
if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
}
}
}
@Override
public long requestStatementId(long sessionId) {
return SESSION_MANAGER.requestStatementId(sessionId);
}
@Override
public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create schema template {}",
SESSION_MANAGER.getCurrSessionId(),
req.getName());
}
// Step 1: transfer from TSCreateSchemaTemplateReq to Statement
CreateSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_SCHEMA_TEMPLATE, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
// todo: check measurement using isLegalSingleMeasurements()
throw new UnsupportedOperationException();
}
@Override
public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
throw new UnsupportedOperationException();
}
@Override
public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
TSQueryTemplateResp resp = new TSQueryTemplateResp();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
resp.setStatus(getNotLoggedInStatus());
return resp;
}
Statement statement = StatementGenerator.createStatement(req);
if (statement == null) {
resp.setStatus(
RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION,
TemplateQueryType.values()[req.getQueryType()].name() + "has not been supported."));
return resp;
}
switch (TemplateQueryType.values()[req.getQueryType()]) {
case SHOW_MEASUREMENTS:
resp.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
break;
case SHOW_TEMPLATES:
resp.setQueryType(TemplateQueryType.SHOW_TEMPLATES.ordinal());
break;
case SHOW_SET_TEMPLATES:
resp.setQueryType(TemplateQueryType.SHOW_SET_TEMPLATES.ordinal());
break;
case SHOW_USING_TEMPLATES:
resp.setQueryType(TemplateQueryType.SHOW_USING_TEMPLATES.ordinal());
break;
}
return executeTemplateQueryStatement(statement, req, resp);
} catch (Exception e) {
resp.setStatus(
onNPEOrUnexpectedException(
e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR));
return resp;
}
}
private TSQueryTemplateResp executeTemplateQueryStatement(
Statement statement, TSQueryTemplateReq req, TSQueryTemplateResp resp) {
long startTime = System.currentTimeMillis();
try {
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setStatus(status);
return resp;
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
long queryId = SESSION_MANAGER.requestQueryId(false);
// create and cache dataset
ExecutionResult executionResult =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
null,
PARTITION_FETCHER,
SCHEMA_FETCHER,
config.getQueryTimeoutThreshold());
if (executionResult.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& executionResult.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
resp.setStatus(executionResult.status);
return resp;
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
List<String> result = new ArrayList<>();
while (queryExecution.hasNextResult()) {
Optional<TsBlock> tsBlock;
try {
tsBlock = queryExecution.getBatchResult();
} catch (IoTDBException e) {
throw new RuntimeException("Fetch Schema failed. ", e);
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
result.add(column.getBinary(i).getStringValue());
}
}
resp.setMeasurements(result);
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"));
return resp;
}
} catch (Exception e) {
resp.setStatus(
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
return null;
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
long costTime = System.currentTimeMillis() - startTime;
if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
}
}
@Override
public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set schema template {}.{}",
SESSION_MANAGER.getCurrSessionId(),
req.getTemplateName(),
req.getPrefixPath());
}
// Step 1: transfer from TSCreateSchemaTemplateReq to Statement
SetSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
}
@Override
public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
throw new UnsupportedOperationException();
}
@Override
public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
throw new UnsupportedOperationException();
}
@Override
public TSStatus handshake(TSyncIdentityInfo info) throws TException {
// TODO(sync): Check permissions here
return SyncService.getInstance().handshake(info);
}
@Override
public TSStatus sendPipeData(ByteBuffer buff) throws TException {
return SyncService.getInstance().transportPipeData(buff);
}
@Override
public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) throws TException {
return SyncService.getInstance().transportFile(metaInfo, buff);
}
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
long t1 = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
SESSION_MANAGER.getCurrSessionId(),
req.getPrefixPath(),
req.getTimestamp());
// check whether measurement is legal according to syntax convention
PathUtils.isLegalSingleMeasurements(req.getMeasurements());
InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
} catch (Exception e) {
return onNPEOrUnexpectedException(
e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
}
private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
// TODO deal with the sg name here
resp.setSgColumns(new ArrayList<>());
resp.setColumns(header.getRespColumns());
resp.setDataTypeList(header.getRespDataTypeList());
resp.setAliasColumns(header.getRespAliasColumns());
resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
resp.setQueryId(queryId);
return resp;
}
private TSStatus getNotLoggedInStatus() {
return RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN_ERROR,
"Log in failed. Either you are not authorized or the session has timed out.");
}
/** Add stat of operation into metrics */
private void addOperationLatency(Operation operation, long startTime) {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {
MetricService.getInstance()
.histogram(
System.currentTimeMillis() - startTime,
"operation_histogram",
MetricLevel.IMPORTANT,
"name",
operation.getName());
MetricService.getInstance()
.count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
}
}
@Override
public void handleClientExit() {
Long sessionId = SESSION_MANAGER.getCurrSessionId();
if (sessionId != null) {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
}
SyncService.getInstance().handleClientExit();
}
private void cleanupQueryExecution(Long queryId) {
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
if (queryExecution != null) {
try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
LOGGER.info("[CleanUpQuery]]");
queryExecution.stopAndCleanup();
COORDINATOR.removeQueryExecution(queryId);
}
}
}
}