blob: 6fc4ab4da7c74f3ad919e760758a4a3d89efb016 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateDeviceTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetDeviceTemplateReq;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
public class Session {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
protected static final TSProtocolVersion protocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
public static final String MSG_DONOT_ENABLE_REDIRECT =
"Query do not enable redirect," + " please confirm the session and server conf.";
protected String username;
protected String password;
protected int fetchSize;
private static final byte TYPE_NULL = -2;
/**
* Timeout of query can be set by users. If not set, default value 0 will be used, which will use
* server configuration.
*/
private long timeout = 0;
protected boolean enableRPCCompression;
protected int connectionTimeoutInMs;
protected ZoneId zoneId;
protected int thriftDefaultBufferSize;
protected int thriftMaxFrameSize;
protected EndPoint defaultEndPoint;
protected SessionConnection defaultSessionConnection;
private boolean isClosed = true;
// Cluster version cache
protected boolean enableCacheLeader;
protected SessionConnection metaSessionConnection;
protected Map<String, EndPoint> deviceIdToEndpoint;
protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
protected boolean enableQueryRedirection = false;
public Session(String host, int rpcPort) {
this(
host,
rpcPort,
Config.DEFAULT_USER,
Config.DEFAULT_PASSWORD,
Config.DEFAULT_FETCH_SIZE,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
}
public Session(String host, String rpcPort, String username, String password) {
this(
host,
Integer.parseInt(rpcPort),
username,
password,
Config.DEFAULT_FETCH_SIZE,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
}
public Session(String host, int rpcPort, String username, String password) {
this(
host,
rpcPort,
username,
password,
Config.DEFAULT_FETCH_SIZE,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
}
public Session(String host, int rpcPort, String username, String password, int fetchSize) {
this(
host,
rpcPort,
username,
password,
fetchSize,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
}
public Session(
String host, int rpcPort, String username, String password, int fetchSize, long timeoutInMs) {
this(
host,
rpcPort,
username,
password,
fetchSize,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
this.timeout = timeoutInMs;
}
public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
this(
host,
rpcPort,
username,
password,
Config.DEFAULT_FETCH_SIZE,
zoneId,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
Config.DEFAULT_CACHE_LEADER_MODE);
}
public Session(
String host, int rpcPort, String username, String password, boolean enableCacheLeader) {
this(
host,
rpcPort,
username,
password,
Config.DEFAULT_FETCH_SIZE,
null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
enableCacheLeader);
}
public Session(
String host,
int rpcPort,
String username,
String password,
int fetchSize,
ZoneId zoneId,
boolean enableCacheLeader) {
this(
host,
rpcPort,
username,
password,
fetchSize,
zoneId,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
Config.DEFAULT_MAX_FRAME_SIZE,
enableCacheLeader);
}
@SuppressWarnings("squid:S107")
public Session(
String host,
int rpcPort,
String username,
String password,
int fetchSize,
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
boolean enableCacheLeader) {
this.defaultEndPoint = new EndPoint(host, rpcPort);
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
this.zoneId = zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.enableCacheLeader = enableCacheLeader;
}
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
public int getFetchSize() {
return this.fetchSize;
}
public synchronized void open() throws IoTDBConnectionException {
open(false, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
return;
}
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader || enableQueryRedirection) {
deviceIdToEndpoint = new HashMap<>();
endPointToSessionConnection = new HashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
}
}
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
return;
}
try {
if (enableCacheLeader) {
for (SessionConnection sessionConnection : endPointToSessionConnection.values()) {
sessionConnection.close();
}
} else {
defaultSessionConnection.close();
}
} finally {
isClosed = true;
}
}
public SessionConnection constructSessionConnection(
Session session, EndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException {
return new SessionConnection(session, endpoint, zoneId);
}
public synchronized String getTimeZone() {
return defaultSessionConnection.getTimeZone();
}
public synchronized void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
defaultSessionConnection.setTimeZone(zoneId);
}
public void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.setStorageGroup(storageGroup);
} catch (RedirectException e) {
handleMetaRedirection(storageGroup, e);
}
}
public void deleteStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
} catch (RedirectException e) {
handleMetaRedirection(storageGroup, e);
}
}
public void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.deleteStorageGroups(storageGroups);
} catch (RedirectException e) {
handleMetaRedirection(storageGroups.toString(), e);
}
}
public void createTimeseries(
String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(path, dataType, encoding, compressor, null, null, null, null);
defaultSessionConnection.createTimeseries(request);
}
public void createTimeseries(
String path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
Map<String, String> tags,
Map<String, String> attributes,
String measurementAlias)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(
path, dataType, encoding, compressor, props, tags, attributes, measurementAlias);
defaultSessionConnection.createTimeseries(request);
}
private TSCreateTimeseriesReq genTSCreateTimeseriesReq(
String path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
Map<String, String> tags,
Map<String, String> attributes,
String measurementAlias) {
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setPath(path);
request.setDataType(dataType.ordinal());
request.setEncoding(encoding.ordinal());
request.setCompressor(compressor.ordinal());
request.setProps(props);
request.setTags(tags);
request.setAttributes(attributes);
request.setMeasurementAlias(measurementAlias);
return request;
}
public void createAlignedTimeseries(
String devicePath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
CompressionType compressor,
List<String> measurementAliasList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateAlignedTimeseriesReq request =
getTSCreateAlignedTimeseriesReq(
devicePath, measurements, dataTypes, encodings, compressor, measurementAliasList);
defaultSessionConnection.createAlignedTimeseries(request);
}
private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq(
String devicePath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
CompressionType compressor,
List<String> measurementAliasList) {
TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
request.setDevicePath(devicePath);
request.setMeasurements(measurements);
request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
request.setCompressor(compressor.ordinal());
request.setMeasurementAlias(measurementAliasList);
return request;
}
public void createMultiTimeseries(
List<String> paths,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<Map<String, String>> propsList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList,
List<String> measurementAliasList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateMultiTimeseriesReq request =
genTSCreateMultiTimeseriesReq(
paths,
dataTypes,
encodings,
compressors,
propsList,
tagsList,
attributesList,
measurementAliasList);
defaultSessionConnection.createMultiTimeseries(request);
}
private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(
List<String> paths,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<Map<String, String>> propsList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList,
List<String> measurementAliasList) {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setPaths(paths);
List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}
request.setEncodings(encodingOrdinals);
List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
for (CompressionType compression : compressors) {
compressionOrdinals.add(compression.ordinal());
}
request.setCompressors(compressionOrdinals);
request.setPropsList(propsList);
request.setTagsList(tagsList);
request.setAttributesList(attributesList);
request.setMeasurementAliasList(measurementAliasList);
return request;
}
public boolean checkTimeseriesExists(String path)
throws IoTDBConnectionException, StatementExecutionException {
return defaultSessionConnection.checkTimeseriesExists(path, timeout);
}
public void setTimeout(long timeoutInMs) throws StatementExecutionException {
if (timeoutInMs < 0) {
throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
}
this.timeout = timeoutInMs;
}
public long getTimeout() {
return timeout;
}
/**
* execute query sql
*
* @param sql query statement
* @return result set
*/
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
return executeStatementMayRedirect(sql, timeout);
}
/**
* execute query sql with explicit timeout
*
* @param sql query statement
* @param timeoutInMs the timeout of this query, in milliseconds
* @return result set
*/
public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
if (timeoutInMs < 0) {
throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
}
return executeStatementMayRedirect(sql, timeoutInMs);
}
/**
* execute the query, may redirect query to other node.
*
* @param sql the query statement
* @param timeoutInMs time in ms
* @return data set
* @throws StatementExecutionException statement is not right
* @throws IoTDBConnectionException the network is not good
*/
private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
try {
logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
logger.debug(
"{} redirect query {} to {}",
defaultSessionConnection.getEndPoint(),
sql,
e.getEndPoint());
// retry
try {
return defaultSessionConnection.executeQueryStatement(sql, timeout);
} catch (RedirectException redirectException) {
logger.error("{} redirect twice", sql, redirectException);
throw new StatementExecutionException(sql + " redirect twice, please try again.");
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
/**
* execute non query statement
*
* @param sql non query statement
*/
public void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.executeNonQueryStatement(sql);
}
/**
* query eg. select * from paths where time >= startTime and time < endTime time interval include
* startTime and exclude endTime
*
* @param paths series path
* @param startTime included
* @param endTime excluded
* @return data set
* @throws StatementExecutionException statement is not right
* @throws IoTDBConnectionException the network is not good
*/
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
logger.debug("redirect query {} to {}", paths, e.getEndPoint());
// retry
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
} catch (RedirectException redirectException) {
logger.error("Redirect twice", redirectException);
throw new StatementExecutionException("Redirect twice, please try again.");
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
/**
* insert data in one row, if you want to improve your performance, please use insertRecords
* method or insertTablet method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
Object... values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request =
genTSInsertRecordReq(deviceId, time, measurements, types, Arrays.asList(values));
insertRecord(deviceId, request);
}
private void insertRecord(String deviceId, TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private void insertRecord(String deviceId, TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private SessionConnection getSessionConnection(String deviceId) {
EndPoint endPoint;
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
}
}
private void handleMetaRedirection(String storageGroup, RedirectException e)
throws IoTDBConnectionException {
if (enableCacheLeader) {
logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
e.getEndPoint(),
k -> {
try {
return constructSessionConnection(this, e.getEndPoint(), zoneId);
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(tmp.get());
}
metaSessionConnection = connection;
}
}
private void handleRedirection(String deviceId, EndPoint endpoint)
throws IoTDBConnectionException {
if (enableCacheLeader) {
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endpoint,
k -> {
try {
return constructSessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(tmp.get());
}
}
}
private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException {
if (enableQueryRedirection) {
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endPoint,
k -> {
try {
SessionConnection sessionConnection =
constructSessionConnection(this, endPoint, zoneId);
sessionConnection.setEnableRedirect(enableQueryRedirection);
return sessionConnection;
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(tmp.get());
}
defaultSessionConnection = connection;
}
}
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
insertRecord(deviceId, request);
}
private TSInsertRecordReq genTSInsertRecordReq(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException {
TSInsertRecordReq request = new TSInsertRecordReq();
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
putValues(types, values, buffer);
request.setValues(buffer);
return request;
}
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request =
genTSInsertStringRecordReq(deviceId, time, measurements, values);
insertRecord(deviceId, request);
}
private TSInsertStringRecordReq genTSInsertStringRecordReq(
String deviceId, long time, List<String> measurements, List<String> values) {
TSInsertStringRecordReq request = new TSInsertStringRecordReq();
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(values);
return request;
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (enableCacheLeader) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList);
} else {
TSInsertStringRecordsReq request =
genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList);
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
// ignore
}
}
}
private void insertStringRecordsWithLeaderCache(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
TSInsertStringRecordsReq request =
deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
updateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
}
// TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
try {
getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
} catch (RedirectException e) {
handleRedirection(entry.getKey(), e.getEndPoint());
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
List<String> deviceId,
List<Long> time,
List<List<String>> measurements,
List<List<String>> values) {
TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
request.setDeviceIds(deviceId);
request.setTimestamps(time);
request.setMeasurementsList(measurements);
request.setValuesList(values);
return request;
}
private void updateTSInsertStringRecordsReq(
TSInsertStringRecordsReq request,
String deviceId,
long time,
List<String> measurements,
List<String> values) {
request.addToDeviceIds(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
request.addToValuesList(values);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (enableCacheLeader) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList);
} else {
TSInsertRecordsReq request =
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList);
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
// ignore
}
}
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @param haveSorted whether the times have been sorted
* @see Session#insertTablet(Tablet)
*/
public void insertRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
TSInsertRecordsOfOneDeviceReq request =
genTSInsertRecordsOfOneDeviceReq(
deviceId, times, measurementsList, typesList, valuesList, haveSorted);
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, BatchExecutionException {
// check params size
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
if (haveSorted) {
if (!checkSorted(times)) {
throw new BatchExecutionException(
"Times in InsertOneDeviceRecords are not in ascending order");
}
} else {
// sort
Integer[] index = new Integer[times.size()];
for (int i = 0; i < times.size(); i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(times::get));
times.sort(Long::compareTo);
// sort measurementList
measurementsList = sortList(measurementsList, index);
// sort typesList
typesList = sortList(typesList, index);
// sort values
valuesList = sortList(valuesList, index);
}
TSInsertRecordsOfOneDeviceReq request = new TSInsertRecordsOfOneDeviceReq();
request.setDeviceId(deviceId);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
@SuppressWarnings("squid:S3740")
private List sortList(List source, Integer[] index) {
Object[] result = new Object[source.size()];
for (int i = 0; i < index.length; i++) {
result[i] = source.get(index[i]);
}
return Arrays.asList(result);
}
private List<ByteBuffer> objectValuesListToByteBufferList(
List<List<Object>> valuesList, List<List<TSDataType>> typesList)
throws IoTDBConnectionException {
List<ByteBuffer> buffersList = new ArrayList<>();
for (int i = 0; i < valuesList.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
putValues(typesList.get(i), valuesList.get(i), buffer);
buffersList.add(buffer);
}
return buffersList;
}
private void insertRecordsWithLeaderCache(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
TSInsertRecordsReq request =
deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
updateTSInsertRecordsReq(
request,
deviceIds.get(i),
times.get(i),
measurementsList.get(i),
typesList.get(i),
valuesList.get(i));
}
// TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
try {
getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
} catch (RedirectException e) {
handleRedirection(entry.getKey(), e.getEndPoint());
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertRecordsReq genTSInsertRecordsReq(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException {
TSInsertRecordsReq request = new TSInsertRecordsReq();
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
private void updateTSInsertRecordsReq(
TSInsertRecordsReq request,
String deviceId,
Long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException {
request.addToDeviceIds(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
putValues(types, values, buffer);
request.addToValuesList(buffer);
}
/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
*
* <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
*
* <p>times in Tablet may be not in ascending order
*
* @param tablet data batch
*/
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
EndPoint endPoint;
try {
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
/**
* insert a Tablet
*
* @param tablet data batch
* @param sorted whether times in Tablet are in ascending order
*/
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
EndPoint endPoint;
try {
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
throws BatchExecutionException {
if (sorted) {
checkSortedThrowable(tablet);
} else {
sortTablet(tablet);
}
TSInsertTabletReq request = new TSInsertTabletReq();
request.setDeviceId(tablet.deviceId);
for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
if (measurementSchema instanceof MeasurementSchema) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
} else {
int measurementsSize = measurementSchema.getValueMeasurementIdList().size();
StringBuilder measurement = new StringBuilder("(");
for (int i = 0; i < measurementsSize; i++) {
measurement.append(measurementSchema.getValueMeasurementIdList().get(i));
if (i != measurementsSize - 1) {
measurement.append(",");
} else {
measurement.append(")");
}
request.addToTypes(measurementSchema.getValueTSDataTypeList().get(i).ordinal());
}
request.addToMeasurements(measurement.toString());
}
}
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
request.setValues(SessionUtils.getValueBuffer(tablet));
request.setSize(tablet.rowSize);
return request;
}
/**
* insert the data of several deivces. Given a deivce, for each timestamp, the number of
* measurements is the same.
*
* <p>Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
*/
public void insertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
insertTablets(tablets, false);
}
/**
* insert the data of several devices. Given a device, for each timestamp, the number of
* measurements is the same.
*
* @param tablets data batch in multiple device
* @param sorted whether times in each Tablet are in ascending order
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
if (enableCacheLeader) {
insertTabletsWithLeaderCache(tablets, sorted);
} else {
TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException ignored) {
// ignored
}
}
}
private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
EndPoint endPoint;
SessionConnection connection;
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
endPoint = deviceIdToEndpoint.get(entry.getKey());
if (endPoint != null) {
connection = endPointToSessionConnection.get(endPoint);
} else {
connection = defaultSessionConnection;
}
TSInsertTabletsReq request =
tabletGroup.computeIfAbsent(connection, k -> new TSInsertTabletsReq());
updateTSInsertTabletsReq(request, entry.getValue(), sorted);
}
// TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<SessionConnection, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
try {
entry.getKey().insertTablets(entry.getValue());
} catch (RedirectException e) {
for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
}
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, boolean sorted)
throws BatchExecutionException {
TSInsertTabletsReq request = new TSInsertTabletsReq();
for (Tablet tablet : tablets) {
updateTSInsertTabletsReq(request, tablet, sorted);
}
return request;
}
private void updateTSInsertTabletsReq(TSInsertTabletsReq request, Tablet tablet, boolean sorted)
throws BatchExecutionException {
if (sorted) {
checkSortedThrowable(tablet);
} else {
sortTablet(tablet);
}
request.addToDeviceIds(tablet.deviceId);
List<String> measurements = new ArrayList<>();
List<Integer> dataTypes = new ArrayList<>();
for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
measurements.add(measurementSchema.getMeasurementId());
dataTypes.add(measurementSchema.getType().ordinal());
}
request.addToMeasurementsList(measurements);
request.addToTypesList(dataTypes);
request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
request.addToValuesList(SessionUtils.getValueBuffer(tablet));
request.addToSizeList(tablet.rowSize);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablet(tablet, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
defaultSessionConnection.testInsertTablet(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablets(tablets, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
defaultSessionConnection.testInsertTablets(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordsReq request =
genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList);
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request =
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList);
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request =
genTSInsertStringRecordReq(deviceId, time, measurements, values);
defaultSessionConnection.testInsertRecord(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
defaultSessionConnection.testInsertRecord(request);
}
/**
* delete a timeseries, including data and schema
*
* @param path timeseries to delete, should be a whole path
*/
public void deleteTimeseries(String path)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
}
/**
* delete some timeseries, including data and schema
*
* @param paths timeseries to delete, should be a whole path
*/
public void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(paths);
}
/**
* delete data <= time in one timeseries
*
* @param path data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
public void deleteData(String path, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime);
}
/**
* delete data <= time in multiple timeseries
*
* @param paths data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
public void deleteData(List<String> paths, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(paths, Long.MIN_VALUE, endTime);
}
/**
* delete data >= startTime and data <= endTime in multiple timeseries
*
* @param paths data in which time series to delete
* @param startTime delete range start time
* @param endTime delete range end time
*/
public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
defaultSessionConnection.deleteData(request);
}
private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long startTime, long endTime) {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
request.setStartTime(startTime);
request.setEndTime(endTime);
return request;
}
private int calculateLength(List<TSDataType> types, List<Object> values)
throws IoTDBConnectionException {
int res = 0;
for (int i = 0; i < types.size(); i++) {
// types
res += Byte.BYTES;
switch (types.get(i)) {
case BOOLEAN:
res += 1;
break;
case INT32:
res += Integer.BYTES;
break;
case INT64:
res += Long.BYTES;
break;
case FLOAT:
res += Float.BYTES;
break;
case DOUBLE:
res += Double.BYTES;
break;
case TEXT:
res += Integer.BYTES;
res += ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
break;
default:
throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i));
}
}
return res;
}
/**
* put value in buffer
*
* @param types types list
* @param values values list
* @param buffer buffer to insert
* @throws IoTDBConnectionException
*/
private void putValues(List<TSDataType> types, List<Object> values, ByteBuffer buffer)
throws IoTDBConnectionException {
for (int i = 0; i < values.size(); i++) {
if (values.get(i) == null) {
ReadWriteIOUtils.write(TYPE_NULL, buffer);
continue;
}
ReadWriteIOUtils.write(types.get(i), buffer);
switch (types.get(i)) {
case BOOLEAN:
ReadWriteIOUtils.write((Boolean) values.get(i), buffer);
break;
case INT32:
ReadWriteIOUtils.write((Integer) values.get(i), buffer);
break;
case INT64:
ReadWriteIOUtils.write((Long) values.get(i), buffer);
break;
case FLOAT:
ReadWriteIOUtils.write((Float) values.get(i), buffer);
break;
case DOUBLE:
ReadWriteIOUtils.write((Double) values.get(i), buffer);
break;
case TEXT:
byte[] bytes = ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
ReadWriteIOUtils.write(bytes.length, buffer);
buffer.put(bytes);
break;
default:
throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i));
}
}
buffer.flip();
}
/**
* check whether the batch has been sorted
*
* @return whether the batch has been sorted
*/
private boolean checkSorted(Tablet tablet) {
for (int i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
}
}
return true;
}
private boolean checkSorted(List<Long> times) {
for (int i = 1; i < times.size(); i++) {
if (times.get(i) < times.get(i - 1)) {
return false;
}
}
return true;
}
private void checkSortedThrowable(Tablet tablet) throws BatchExecutionException {
if (!checkSorted(tablet)) {
throw new BatchExecutionException("Times in Tablet are not in ascending order");
}
}
protected void sortTablet(Tablet tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
*/
// sort to get index, and use index to sort value list
Integer[] index = new Integer[tablet.rowSize];
for (int i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
int columnIndex = 0;
for (int i = 0; i < tablet.getSchemas().size(); i++) {
IMeasurementSchema schema = tablet.getSchemas().get(i);
if (schema instanceof MeasurementSchema) {
tablet.values[columnIndex] = sortList(tablet.values[columnIndex], schema.getType(), index);
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
}
columnIndex++;
} else {
int measurementSize = schema.getValueMeasurementIdList().size();
for (int j = 0; j < measurementSize; j++) {
tablet.values[columnIndex] =
sortList(tablet.values[columnIndex], schema.getValueTSDataTypeList().get(j), index);
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
}
columnIndex++;
}
}
}
}
/**
* sort value list by index
*
* @param valueList value list
* @param dataType data type
* @param index index
* @return sorted list
*/
private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
boolean[] sortedValues = new boolean[boolValues.length];
for (int i = 0; i < index.length; i++) {
sortedValues[i] = boolValues[index[i]];
}
return sortedValues;
case INT32:
int[] intValues = (int[]) valueList;
int[] sortedIntValues = new int[intValues.length];
for (int i = 0; i < index.length; i++) {
sortedIntValues[i] = intValues[index[i]];
}
return sortedIntValues;
case INT64:
long[] longValues = (long[]) valueList;
long[] sortedLongValues = new long[longValues.length];
for (int i = 0; i < index.length; i++) {
sortedLongValues[i] = longValues[index[i]];
}
return sortedLongValues;
case FLOAT:
float[] floatValues = (float[]) valueList;
float[] sortedFloatValues = new float[floatValues.length];
for (int i = 0; i < index.length; i++) {
sortedFloatValues[i] = floatValues[index[i]];
}
return sortedFloatValues;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
double[] sortedDoubleValues = new double[doubleValues.length];
for (int i = 0; i < index.length; i++) {
sortedDoubleValues[i] = doubleValues[index[i]];
}
return sortedDoubleValues;
case TEXT:
Binary[] binaryValues = (Binary[]) valueList;
Binary[] sortedBinaryValues = new Binary[binaryValues.length];
for (int i = 0; i < index.length; i++) {
sortedBinaryValues[i] = binaryValues[index[i]];
}
return sortedBinaryValues;
default:
throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
}
}
/**
* sort BitMap by index
*
* @param bitMap BitMap to be sorted
* @param index index
* @return sorted bitMap
*/
private BitMap sortBitMap(BitMap bitMap, Integer[] index) {
BitMap sortedBitMap = new BitMap(bitMap.getSize());
for (int i = 0; i < index.length; i++) {
if (bitMap.isMarked(index[i])) {
sortedBitMap.mark(i);
}
}
return sortedBitMap;
}
public void setDeviceTemplate(String templateName, String prefixPath)
throws IoTDBConnectionException, StatementExecutionException {
TSSetDeviceTemplateReq request = getTSSetDeviceTemplateReq(templateName, prefixPath);
defaultSessionConnection.setDeviceTemplate(request);
}
/**
* @param name template name
* @param measurements List of measurements, if it is a single measurement, just put it's name
* into a list and add to measurements if it is a vector measurement, put all measurements of
* the vector into a list and add to measurements
* @param dataTypes List of datatypes, if it is a single measurement, just put it's type into a
* list and add to dataTypes if it is a vector measurement, put all types of the vector into a
* list and add to dataTypes
* @param encodings List of encodings, if it is a single measurement, just put it's encoding into
* a list and add to encodings if it is a vector measurement, put all encodings of the vector
* into a list and add to encodings
* @param compressors List of compressors
* @throws IoTDBConnectionException
* @throws StatementExecutionException
*/
public void createDeviceTemplate(
String name,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateDeviceTemplateReq request =
getTSCreateDeviceTemplateReq(name, measurements, dataTypes, encodings, compressors);
defaultSessionConnection.createDeviceTemplate(request);
}
private TSSetDeviceTemplateReq getTSSetDeviceTemplateReq(String templateName, String prefixPath) {
TSSetDeviceTemplateReq request = new TSSetDeviceTemplateReq();
request.setTemplateName(templateName);
request.setPrefixPath(prefixPath);
return request;
}
private TSCreateDeviceTemplateReq getTSCreateDeviceTemplateReq(
String name,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors) {
TSCreateDeviceTemplateReq request = new TSCreateDeviceTemplateReq();
request.setName(name);
request.setMeasurements(measurements);
List<List<Integer>> requestType = new ArrayList<>();
for (List<TSDataType> typesList : dataTypes) {
requestType.add(typesList.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
}
request.setDataTypes(requestType);
List<List<Integer>> requestEncoding = new ArrayList<>();
for (List<TSEncoding> encodingList : encodings) {
requestEncoding.add(
encodingList.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
}
request.setEncodings(requestEncoding);
request.setCompressors(
compressors.stream().map(CompressionType::ordinal).collect(Collectors.toList()));
return request;
}
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
public void setEnableQueryRedirection(boolean enableQueryRedirection) {
this.enableQueryRedirection = enableQueryRedirection;
}
}