blob: 39140136742684c5ef8be5ab2fc9770dffdc9b2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
public class Session {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
protected static final TSProtocolVersion protocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
protected String username;
protected String password;
protected int fetchSize;
protected boolean enableRPCCompression;
protected int connectionTimeoutInMs;
protected int initialBufferCapacity;
protected int maxFrameSize;
private EndPoint defaultEndPoint;
private SessionConnection defaultSessionConnection;
protected boolean isClosed = true;
private ZoneId zoneId;
// Cluster version cache
private SessionConnection metaSessionConnection;
private Map<String, EndPoint> deviceIdToEndpoint;
private Map<EndPoint, SessionConnection> endPointToSessionConnection;
private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
public Session(String host, int rpcPort) {
this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE,
null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, String rpcPort, String username, String password) {
this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password) {
this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password, int fetchSize) {
this(host, rpcPort, username, password, fetchSize, null,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password, int fetchSize,
ZoneId zoneId) {
this(host, rpcPort, username, password, fetchSize, zoneId,
Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
@SuppressWarnings("squid:S107")
public Session(String host, int rpcPort, String username, String password, int fetchSize,
ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) {
this.defaultEndPoint = new EndPoint(host, rpcPort);
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
this.zoneId = zoneId;
this.initialBufferCapacity = initialBufferCapacity;
this.maxFrameSize = maxFrameSize;
}
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
public int getFetchSize() {
return this.fetchSize;
}
public synchronized void open() throws IoTDBConnectionException {
open(false, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
return;
}
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = new SessionConnection(this, defaultEndPoint, zoneId);
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (Config.DEFAULT_CACHE_LEADER_MODE) {
deviceIdToEndpoint = new HashMap<>();
endPointToSessionConnection = new HashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
}
}
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
return;
}
try {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
for (SessionConnection sessionConnection : endPointToSessionConnection.values()) {
sessionConnection.close();
}
} else {
defaultSessionConnection.close();
}
} finally {
isClosed = true;
}
}
public synchronized String getTimeZone() {
return defaultSessionConnection.getTimeZone();
}
public synchronized void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
defaultSessionConnection.setTimeZone(zoneId);
}
public void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.setStorageGroup(storageGroup);
} catch (RedirectException e) {
handleMetaRedirection(storageGroup, e);
}
}
public void deleteStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
} catch (RedirectException e) {
handleMetaRedirection(storageGroup, e);
}
}
public void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
try {
metaSessionConnection.deleteStorageGroups(storageGroups);
} catch (RedirectException e) {
handleMetaRedirection(storageGroups.toString(), e);
}
}
public void createTimeseries(String path, TSDataType dataType,
TSEncoding encoding, CompressionType compressor)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, encoding, compressor,
null, null, null, null);
defaultSessionConnection.createTimeseries(request);
}
public void createTimeseries(String path, TSDataType dataType,
TSEncoding encoding, CompressionType compressor, Map<String, String> props,
Map<String, String> tags, Map<String, String> attributes, String measurementAlias)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, encoding, compressor,
props, tags, attributes, measurementAlias);
defaultSessionConnection.createTimeseries(request);
}
private TSCreateTimeseriesReq genTSCreateTimeseriesReq(String path, TSDataType dataType,
TSEncoding encoding, CompressionType compressor, Map<String, String> props,
Map<String, String> tags, Map<String, String> attributes, String measurementAlias) {
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setPath(path);
request.setDataType(dataType.ordinal());
request.setEncoding(encoding.ordinal());
request.setCompressor(compressor.ordinal());
request.setProps(props);
request.setTags(tags);
request.setAttributes(attributes);
request.setMeasurementAlias(measurementAlias);
return request;
}
public void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,
List<TSEncoding> encodings, List<CompressionType> compressors,
List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList, List<String> measurementAliasList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateMultiTimeseriesReq request = genTSCreateMultiTimeseriesReq(paths, dataTypes, encodings,
compressors, propsList, tagsList, attributesList, measurementAliasList);
defaultSessionConnection.createMultiTimeseries(request);
}
private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(List<String> paths,
List<TSDataType> dataTypes,
List<TSEncoding> encodings, List<CompressionType> compressors,
List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList, List<String> measurementAliasList) {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setPaths(paths);
List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}
request.setEncodings(encodingOrdinals);
List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
for (CompressionType compression : compressors) {
compressionOrdinals.add(compression.ordinal());
}
request.setCompressors(compressionOrdinals);
request.setPropsList(propsList);
request.setTagsList(tagsList);
request.setAttributesList(attributesList);
request.setMeasurementAliasList(measurementAliasList);
return request;
}
public boolean checkTimeseriesExists(String path)
throws IoTDBConnectionException, StatementExecutionException {
return defaultSessionConnection.checkTimeseriesExists(path);
}
/**
* execute query sql
*
* @param sql query statement
* @return result set
*/
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
return defaultSessionConnection.executeQueryStatement(sql);
}
/**
* execute query sql with explicit timeout
*
* @param sql query statement
* @param timeoutInMs the timeout of this query, in milliseconds
* @return result set
*/
public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
if (timeoutInMs <= 0) {
throw new StatementExecutionException("Timeout must be over 0, please check and try again.");
}
return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
}
/**
* execute non query statement
*
* @param sql non query statement
*/
public void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.executeNonQueryStatement(sql);
}
/**
* query eg. select * from paths where time >= startTime and time < endTime time interval include
* startTime and exclude endTime
*
* @param paths
* @param startTime included
* @param endTime excluded
* @return
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
}
/**
* insert data in one row, if you want to improve your performance, please use insertRecords
* method or insertTablet method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types,
Object... values) throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types,
Arrays.asList(values));
insertRecord(deviceId, request);
}
private void insertRecord(String deviceId, TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private void insertRecord(String deviceId, TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private SessionConnection getSessionConnection(String deviceId) {
EndPoint endPoint;
if (Config.DEFAULT_CACHE_LEADER_MODE
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
}
}
private void handleMetaRedirection(String storageGroup, RedirectException e)
throws IoTDBConnectionException {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
SessionConnection connection = endPointToSessionConnection
.computeIfAbsent(e.getEndPoint(), k -> {
try {
return new SessionConnection(this, e.getEndPoint(), zoneId);
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(tmp.get());
}
metaSessionConnection = connection;
}
}
private void handleRedirection(String deviceId, EndPoint endpoint)
throws IoTDBConnectionException {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection = endPointToSessionConnection
.computeIfAbsent(endpoint, k -> {
try {
return new SessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(tmp.get());
}
}
}
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types,
List<Object> values) throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
insertRecord(deviceId, request);
}
private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values) throws IoTDBConnectionException {
TSInsertRecordReq request = new TSInsertRecordReq();
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
putValues(types, values, buffer);
request.setValues(buffer);
return request;
}
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
public void insertRecord(String deviceId, long time, List<String> measurements,
List<String> values) throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements,
values);
insertRecord(deviceId, request);
}
private TSInsertStringRecordReq genTSInsertStringRecordReq(String deviceId, long time,
List<String> measurements, List<String> values) {
TSInsertStringRecordReq request = new TSInsertStringRecordReq();
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(values);
return request;
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
* <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (Config.DEFAULT_CACHE_LEADER_MODE) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList);
} else {
TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
measurementsList, valuesList);
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
// ignore
}
}
}
private void insertStringRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
TSInsertStringRecordsReq request = deviceGroup
.computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
measurementsList.get(i), valuesList.get(i));
}
//TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
try {
getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
} catch (RedirectException e) {
handleRedirection(entry.getKey(), e.getEndPoint());
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> deviceId,
List<Long> time,
List<List<String>> measurements, List<List<String>> values) {
TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
request.setDeviceIds(deviceId);
request.setTimestamps(time);
request.setMeasurementsList(measurements);
request.setValuesList(values);
return request;
}
private void updateTSInsertStringRecordsReq(TSInsertStringRecordsReq request,
String deviceId, long time,
List<String> measurements, List<String> values) {
request.addToDeviceIds(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
request.addToValuesList(values);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
* <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (Config.DEFAULT_CACHE_LEADER_MODE) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList);
} else {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
typesList, valuesList);
try {
defaultSessionConnection
.insertRecords(request);
} catch (RedirectException ignored) {
// ignore
}
}
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
* <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
* <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
* @param haveSorted whether the times have been sorted
* @see Session#insertTablet(Tablet)
*/
public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList, boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times,
measurementsList, typesList, valuesList, haveSorted);
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
}
}
private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId,
List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList, boolean haveSorted)
throws IoTDBConnectionException, BatchExecutionException {
// check params size
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
if (haveSorted) {
if (!checkSorted(times)) {
throw new BatchExecutionException(
"Times in InsertOneDeviceRecords are not in ascending order");
}
} else {
//sort
Integer[] index = new Integer[times.size()];
for (int i = 0; i < times.size(); i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(times::get));
times.sort(Long::compareTo);
//sort measurementList
measurementsList = sortList(measurementsList, index);
//sort typesList
typesList = sortList(typesList, index);
//sort values
valuesList = sortList(valuesList, index);
}
TSInsertRecordsOfOneDeviceReq request = new TSInsertRecordsOfOneDeviceReq();
request.setDeviceId(deviceId);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
@SuppressWarnings("squid:S3740")
private List sortList(List source, Integer[] index) {
Object[] result = new Object[source.size()];
for (int i = 0; i < index.length; i++) {
result[i] = source.get(index[i]);
}
return Arrays.asList(result);
}
private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList,
List<List<TSDataType>> typesList) throws IoTDBConnectionException {
List<ByteBuffer> buffersList = new ArrayList<>();
for (int i = 0; i < valuesList.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
putValues(typesList.get(i), valuesList.get(i), buffer);
buffersList.add(buffer);
}
return buffersList;
}
private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
TSInsertRecordsReq request = deviceGroup
.computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
measurementsList.get(i), typesList.get(i), valuesList.get(i));
}
//TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
try {
getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
} catch (RedirectException e) {
handleRedirection(entry.getKey(), e.getEndPoint());
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList) throws IoTDBConnectionException {
TSInsertRecordsReq request = new TSInsertRecordsReq();
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
private void updateTSInsertRecordsReq(TSInsertRecordsReq request, String deviceId, Long time,
List<String> measurements, List<TSDataType> types,
List<Object> values) throws IoTDBConnectionException {
request.addToDeviceIds(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
putValues(types, values, buffer);
request.addToValuesList(buffer);
}
/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
* <p>
* a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
* <p/>
* times in Tablet may be not in ascending order
*
* @param tablet data batch
*/
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
EndPoint endPoint;
try {
if (Config.DEFAULT_CACHE_LEADER_MODE
&& (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
/**
* insert a Tablet
*
* @param tablet data batch
* @param sorted whether times in Tablet are in ascending order
*/
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
EndPoint endPoint;
try {
if (Config.DEFAULT_CACHE_LEADER_MODE
&& (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
throws BatchExecutionException {
if (sorted) {
checkSortedThrowable(tablet);
} else {
sortTablet(tablet);
}
TSInsertTabletReq request = new TSInsertTabletReq();
request.setDeviceId(tablet.deviceId);
for (MeasurementSchema measurementSchema : tablet.getSchemas()) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
}
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
request.setValues(SessionUtils.getValueBuffer(tablet));
request.setSize(tablet.rowSize);
return request;
}
/**
* insert the data of several deivces. Given a deivce, for each timestamp, the number of
* measurements is the same.
* <p>
* Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
*/
public void insertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
insertTablets(tablets, false);
}
/**
* insert the data of several devices. Given a device, for each timestamp, the number of
* measurements is the same.
*
* @param tablets data batch in multiple device
* @param sorted whether times in each Tablet are in ascending order
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
insertTabletsWithLeaderCache(tablets, sorted);
} else {
TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException ignored) {
// ignored
}
}
}
private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted) throws
IoTDBConnectionException, StatementExecutionException {
EndPoint endPoint;
SessionConnection connection;
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
endPoint = deviceIdToEndpoint.get(entry.getKey());
if (endPoint != null) {
connection = endPointToSessionConnection.get(endPoint);
} else {
connection = defaultSessionConnection;
}
TSInsertTabletsReq request = tabletGroup
.computeIfAbsent(connection, k -> new TSInsertTabletsReq());
updateTSInsertTabletsReq(request, entry.getValue(), sorted);
}
//TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
for (Entry<SessionConnection, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
try {
entry.getKey().insertTablets(entry.getValue());
} catch (RedirectException e) {
for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
}
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
}
}
String errMsg = errMsgBuilder.toString();
if (!errMsg.isEmpty()) {
throw new StatementExecutionException(errMsg);
}
}
private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, boolean sorted)
throws BatchExecutionException {
TSInsertTabletsReq request = new TSInsertTabletsReq();
for (Tablet tablet : tablets) {
updateTSInsertTabletsReq(request, tablet, sorted);
}
return request;
}
private void updateTSInsertTabletsReq(TSInsertTabletsReq request, Tablet tablet, boolean sorted)
throws BatchExecutionException {
if (sorted) {
checkSortedThrowable(tablet);
} else {
sortTablet(tablet);
}
request.addToDeviceIds(tablet.deviceId);
List<String> measurements = new ArrayList<>();
List<Integer> dataTypes = new ArrayList<>();
for (MeasurementSchema measurementSchema : tablet.getSchemas()) {
measurements.add(measurementSchema.getMeasurementId());
dataTypes.add(measurementSchema.getType().ordinal());
}
request.addToMeasurementsList(measurements);
request.addToTypesList(dataTypes);
request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
request.addToValuesList(SessionUtils.getValueBuffer(tablet));
request.addToSizeList(tablet.rowSize);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablet(tablet, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
defaultSessionConnection.testInsertTablet(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablets(tablets, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
defaultSessionConnection.testInsertTablets(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
measurementsList, valuesList);
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
typesList, valuesList);
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecord(String deviceId, long time, List<String> measurements,
List<String> values) throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements,
values);
defaultSessionConnection.testInsertRecord(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
public void testInsertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types, List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
defaultSessionConnection.testInsertRecord(request);
}
/**
* delete a timeseries, including data and schema
*
* @param path timeseries to delete, should be a whole path
*/
public void deleteTimeseries(String path)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
}
/**
* delete some timeseries, including data and schema
*
* @param paths timeseries to delete, should be a whole path
*/
public void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(paths);
}
/**
* delete data <= time in one timeseries
*
* @param path data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
public void deleteData(String path, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime);
}
/**
* delete data <= time in multiple timeseries
*
* @param paths data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
public void deleteData(List<String> paths, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(paths, Long.MIN_VALUE, endTime);
}
/**
* delete data >= startTime and data <= endTime in multiple timeseries
*
* @param paths data in which time series to delete
* @param startTime delete range start time
* @param endTime delete range end time
*/
public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
defaultSessionConnection.deleteData(request);
}
private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long startTime, long endTime) {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
request.setStartTime(startTime);
request.setEndTime(endTime);
return request;
}
private int calculateLength(List<TSDataType> types, List<Object> values)
throws IoTDBConnectionException {
int res = 0;
for (int i = 0; i < types.size(); i++) {
// types
res += Short.BYTES;
switch (types.get(i)) {
case BOOLEAN:
res += 1;
break;
case INT32:
res += Integer.BYTES;
break;
case INT64:
res += Long.BYTES;
break;
case FLOAT:
res += Float.BYTES;
break;
case DOUBLE:
res += Double.BYTES;
break;
case TEXT:
res += Integer.BYTES;
res += ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
break;
default:
throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i));
}
}
return res;
}
/**
* put value in buffer
*
* @param types types list
* @param values values list
* @param buffer buffer to insert
* @throws IoTDBConnectionException
*/
private void putValues(List<TSDataType> types, List<Object> values, ByteBuffer buffer)
throws IoTDBConnectionException {
for (int i = 0; i < values.size(); i++) {
ReadWriteIOUtils.write(types.get(i), buffer);
switch (types.get(i)) {
case BOOLEAN:
ReadWriteIOUtils.write((Boolean) values.get(i), buffer);
break;
case INT32:
ReadWriteIOUtils.write((Integer) values.get(i), buffer);
break;
case INT64:
ReadWriteIOUtils.write((Long) values.get(i), buffer);
break;
case FLOAT:
ReadWriteIOUtils.write((Float) values.get(i), buffer);
break;
case DOUBLE:
ReadWriteIOUtils.write((Double) values.get(i), buffer);
break;
case TEXT:
byte[] bytes = ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
ReadWriteIOUtils.write(bytes.length, buffer);
buffer.put(bytes);
break;
default:
throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i));
}
}
buffer.flip();
}
/**
* check whether the batch has been sorted
*
* @return whether the batch has been sorted
*/
private boolean checkSorted(Tablet tablet) {
for (int i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
}
}
return true;
}
private boolean checkSorted(List<Long> times) {
for (int i = 1; i < times.size(); i++) {
if (times.get(i) < times.get(i - 1)) {
return false;
}
}
return true;
}
private void checkSortedThrowable(Tablet tablet) throws BatchExecutionException {
if (!checkSorted(tablet)) {
throw new BatchExecutionException("Times in Tablet are not in ascending order");
}
}
protected void sortTablet(Tablet tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
*/
// sort to get index, and use index to sort value list
Integer[] index = new Integer[tablet.rowSize];
for (int i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
for (int i = 0; i < tablet.getSchemas().size(); i++) {
tablet.values[i] =
sortList(tablet.values[i], tablet.getSchemas().get(i).getType(), index);
}
}
/**
* sort value list by index
*
* @param valueList value list
* @param dataType data type
* @param index index
* @return sorted list
*/
private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
boolean[] sortedValues = new boolean[boolValues.length];
for (int i = 0; i < index.length; i++) {
sortedValues[i] = boolValues[index[i]];
}
return sortedValues;
case INT32:
int[] intValues = (int[]) valueList;
int[] sortedIntValues = new int[intValues.length];
for (int i = 0; i < index.length; i++) {
sortedIntValues[i] = intValues[index[i]];
}
return sortedIntValues;
case INT64:
long[] longValues = (long[]) valueList;
long[] sortedLongValues = new long[longValues.length];
for (int i = 0; i < index.length; i++) {
sortedLongValues[i] = longValues[index[i]];
}
return sortedLongValues;
case FLOAT:
float[] floatValues = (float[]) valueList;
float[] sortedFloatValues = new float[floatValues.length];
for (int i = 0; i < index.length; i++) {
sortedFloatValues[i] = floatValues[index[i]];
}
return sortedFloatValues;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
double[] sortedDoubleValues = new double[doubleValues.length];
for (int i = 0; i < index.length; i++) {
sortedDoubleValues[i] = doubleValues[index[i]];
}
return sortedDoubleValues;
case TEXT:
Binary[] binaryValues = (Binary[]) valueList;
Binary[] sortedBinaryValues = new Binary[binaryValues.length];
for (int i = 0; i < index.length; i++) {
sortedBinaryValues[i] = binaryValues[index[i]];
}
return sortedBinaryValues;
default:
throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
}
}
}