blob: b87d8937a1e79e2324554c394c08cecfd1fc8321 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.INodeSupplier;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.NoValidValueException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.session.template.MeasurementNode;
import org.apache.iotdb.session.template.TemplateQueryType;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.iotdb.session.util.ThreadUtils;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
public class Session implements ISession {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
protected static final TSProtocolVersion protocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
public static final String MSG_DONOT_ENABLE_REDIRECT =
"Query do not enable redirect," + " please confirm the session and server conf.";
private static final ThreadPoolExecutor OPERATION_EXECUTOR =
new ThreadPoolExecutor(
SessionConfig.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
SessionConfig.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(SessionConfig.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
ThreadUtils.createThreadFactory("SessionExecutor", true));
protected List<String> nodeUrls;
protected String username;
protected String password;
protected int fetchSize;
protected boolean useSSL;
protected String trustStore;
protected String trustStorePwd;
/**
* Timeout of query can be set by users. A negative number means using the default configuration
* of server. And value 0 will disable the function of query timeout.
*/
private long queryTimeoutInMs = -1;
protected boolean enableRPCCompression;
protected int connectionTimeoutInMs;
protected ZoneId zoneId;
protected int thriftDefaultBufferSize;
protected int thriftMaxFrameSize;
protected TEndPoint defaultEndPoint;
protected SessionConnection defaultSessionConnection;
private boolean isClosed = true;
// Cluster version cache
protected boolean enableRedirection;
protected boolean enableRecordsAutoConvertTablet =
SessionConfig.DEFAULT_RECORDS_AUTO_CONVERT_TABLET;
private static final double CONVERT_THRESHOLD = 0.5;
private static final double SAMPLE_PROPORTION = 0.05;
private static final int MIN_RECORDS_SIZE = 40;
@SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile"
protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
@SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile"
protected volatile Map<TEndPoint, SessionConnection> endPointToSessionConnection;
// used to update datanodeList periodically
protected volatile ScheduledExecutorService executorService;
protected INodeSupplier availableNodes;
protected boolean enableQueryRedirection = false;
// The version number of the client which used for compatibility in the server
protected Version version;
// default enable
protected boolean enableAutoFetch = true;
protected int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
private static final String REDIRECT_TWICE = "redirect twice";
private static final String REDIRECT_TWICE_RETRY = "redirect twice, please try again.";
private static final String VALUES_SIZE_SHOULD_BE_EQUAL =
"times, measurementsList and valuesList's size should be equal";
private static final String SESSION_CANNOT_CONNECT = "Session can not connect to {}";
private static final String ALL_VALUES_ARE_NULL =
"All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]";
private static final String ALL_VALUES_ARE_NULL_WITH_TIME =
"All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]";
private static final String ALL_VALUES_ARE_NULL_MULTI_DEVICES =
"All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]";
private static final String ALL_INSERT_DATA_IS_NULL = "All inserted data is null.";
public Session(String host, int rpcPort) {
this(
host,
rpcPort,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(String host, String rpcPort, String username, String password) {
this(
host,
Integer.parseInt(rpcPort),
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(String host, int rpcPort, String username, String password) {
this(
host,
rpcPort,
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(String host, int rpcPort, String username, String password, int fetchSize) {
this(
host,
rpcPort,
username,
password,
fetchSize,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(
String host,
int rpcPort,
String username,
String password,
int fetchSize,
long queryTimeoutInMs) {
this(
host,
rpcPort,
username,
password,
fetchSize,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
this.queryTimeoutInMs = queryTimeoutInMs;
}
public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
this(
host,
rpcPort,
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(
String host, int rpcPort, String username, String password, boolean enableRedirection) {
this(
host,
rpcPort,
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
enableRedirection,
SessionConfig.DEFAULT_VERSION);
}
public Session(
String host,
int rpcPort,
String username,
String password,
int fetchSize,
ZoneId zoneId,
boolean enableRedirection) {
this(
host,
rpcPort,
username,
password,
fetchSize,
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
enableRedirection,
SessionConfig.DEFAULT_VERSION);
}
@SuppressWarnings("squid:S107")
public Session(
String host,
int rpcPort,
String username,
String password,
int fetchSize,
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
boolean enableRedirection,
Version version) {
this.defaultEndPoint = new TEndPoint(host, rpcPort);
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
this.zoneId = zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.enableRedirection = enableRedirection;
this.version = version;
}
public Session(List<String> nodeUrls, String username, String password) {
this(
nodeUrls,
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
/**
* Multiple nodeUrl,If one node down, connect to the next one
*
* @param nodeUrls List<String> Multiple ip:rpcPort eg.127.0.0.1:9001
*/
public Session(List<String> nodeUrls, String username, String password, int fetchSize) {
this(
nodeUrls,
username,
password,
fetchSize,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(List<String> nodeUrls, String username, String password, ZoneId zoneId) {
this(
nodeUrls,
username,
password,
SessionConfig.DEFAULT_FETCH_SIZE,
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(
List<String> nodeUrls,
String username,
String password,
int fetchSize,
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
boolean enableRedirection,
Version version) {
if (nodeUrls.isEmpty()) {
throw new IllegalArgumentException("nodeUrls shouldn't be empty.");
}
this.nodeUrls = nodeUrls;
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
this.zoneId = zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.enableRedirection = enableRedirection;
this.version = version;
}
public Session(Builder builder) {
if (builder.nodeUrls != null) {
if (builder.nodeUrls.isEmpty()) {
throw new IllegalArgumentException("nodeUrls shouldn't be empty.");
}
this.nodeUrls = builder.nodeUrls;
this.enableQueryRedirection = true;
} else {
this.defaultEndPoint = new TEndPoint(builder.host, builder.rpcPort);
this.enableQueryRedirection = builder.enableRedirection;
}
this.enableRedirection = builder.enableRedirection;
this.enableRecordsAutoConvertTablet = builder.enableRecordsAutoConvertTablet;
this.username = builder.username;
this.password = builder.pw;
this.fetchSize = builder.fetchSize;
this.zoneId = builder.zoneId;
this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
this.version = builder.version;
this.useSSL = builder.useSSL;
this.trustStore = builder.trustStore;
this.trustStorePwd = builder.trustStorePwd;
this.enableAutoFetch = builder.enableAutoFetch;
this.maxRetryCount = builder.maxRetryCount;
this.retryIntervalInMs = builder.retryIntervalInMs;
}
@Override
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
@Override
public int getFetchSize() {
return this.fetchSize;
}
@Override
public Version getVersion() {
return version;
}
@Override
public void setVersion(Version version) {
this.version = version;
}
@Override
public synchronized void open() throws IoTDBConnectionException {
open(false, SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@Override
public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
open(enableRPCCompression, SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@Override
public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
return;
}
if (this.executorService != null) {
this.executorService.shutdown();
this.executorService = null;
}
if (this.availableNodes != null) {
this.availableNodes.close();
this.availableNodes = null;
}
if (enableAutoFetch) {
initThreadPool();
this.availableNodes =
NodesSupplier.createNodeSupplier(
getNodeUrls(),
executorService,
username,
password,
zoneId,
thriftDefaultBufferSize,
thriftMaxFrameSize,
connectionTimeoutInMs,
useSSL,
trustStore,
trustStorePwd,
enableRPCCompression,
version.toString());
} else {
this.availableNodes = new DummyNodesSupplier(getNodeUrls());
}
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
}
}
private void initThreadPool() {
this.executorService =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t =
new Thread(
Thread.currentThread().getThreadGroup(), r, "PeriodicalUpdateDNList", 0);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
});
}
private List<TEndPoint> getNodeUrls() {
if (defaultEndPoint != null) {
return Collections.singletonList(defaultEndPoint);
} else {
return SessionUtils.parseSeedNodeUrls(nodeUrls);
}
}
@Override
public synchronized void open(
boolean enableRPCCompression,
int connectionTimeoutInMs,
Map<String, TEndPoint> deviceIdToEndpoint,
INodeSupplier nodesSupplier)
throws IoTDBConnectionException {
if (!isClosed) {
return;
}
this.availableNodes = nodesSupplier;
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
this.deviceIdToEndpoint = deviceIdToEndpoint;
endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
}
}
@Override
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
return;
}
try {
if (enableRedirection) {
for (SessionConnection sessionConnection : endPointToSessionConnection.values()) {
sessionConnection.close();
}
} else {
defaultSessionConnection.close();
}
} finally {
// if executorService is null, it means that availableNodes is got from SessionPool and we
// shouldn't clean that
if (this.executorService != null) {
this.executorService.shutdown();
this.executorService = null;
this.availableNodes.close();
this.availableNodes = null;
}
isClosed = true;
}
}
public SessionConnection constructSessionConnection(
Session session, TEndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException {
if (endpoint == null) {
return new SessionConnection(
session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
}
return new SessionConnection(
session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
}
@Override
public synchronized String getTimeZone() {
return defaultSessionConnection.getTimeZone();
}
@Override
public synchronized void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
defaultSessionConnection.setTimeZone(zoneId);
this.zoneId = ZoneId.of(zoneId);
}
/** Only changes the member variable of the Session object without sending it to server. */
@Override
public void setTimeZoneOfSession(String zoneId) {
defaultSessionConnection.setTimeZoneOfSession(zoneId);
this.zoneId = ZoneId.of(zoneId);
}
@Override
public void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.setStorageGroup(storageGroup);
}
@Override
public void deleteStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
}
@Override
public void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteStorageGroups(storageGroups);
}
@Override
public void createDatabase(String database)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.setStorageGroup(database);
}
@Override
public void deleteDatabase(String database)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(database));
}
@Override
public void deleteDatabases(List<String> databases)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteStorageGroups(databases);
}
@Override
public void createTimeseries(
String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(path, dataType, encoding, compressor, null, null, null, null);
defaultSessionConnection.createTimeseries(request);
}
@Override
public void createTimeseries(
String path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
Map<String, String> tags,
Map<String, String> attributes,
String measurementAlias)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(
path, dataType, encoding, compressor, props, tags, attributes, measurementAlias);
defaultSessionConnection.createTimeseries(request);
}
private TSCreateTimeseriesReq genTSCreateTimeseriesReq(
String path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
Map<String, String> tags,
Map<String, String> attributes,
String measurementAlias) {
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setPath(path);
request.setDataType(dataType.ordinal());
request.setEncoding(encoding.ordinal());
request.setCompressor(compressor.serialize());
request.setProps(props);
request.setTags(tags);
request.setAttributes(attributes);
request.setMeasurementAlias(measurementAlias);
return request;
}
@Override
public void createAlignedTimeseries(
String deviceId,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<String> measurementAliasList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateAlignedTimeseriesReq request =
getTSCreateAlignedTimeseriesReq(
deviceId,
measurements,
dataTypes,
encodings,
compressors,
measurementAliasList,
null,
null);
defaultSessionConnection.createAlignedTimeseries(request);
}
@Override
public void createAlignedTimeseries(
String deviceId,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<String> measurementAliasList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateAlignedTimeseriesReq request =
getTSCreateAlignedTimeseriesReq(
deviceId,
measurements,
dataTypes,
encodings,
compressors,
measurementAliasList,
tagsList,
attributesList);
defaultSessionConnection.createAlignedTimeseries(request);
}
private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq(
String prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<String> measurementAliasList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList) {
TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
request.setPrefixPath(prefixPath);
request.setMeasurements(measurements);
request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
request.setCompressors(
compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
request.setMeasurementAlias(measurementAliasList);
request.setTagsList(tagsList);
request.setAttributesList(attributesList);
return request;
}
@Override
public void createMultiTimeseries(
List<String> paths,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<Map<String, String>> propsList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList,
List<String> measurementAliasList)
throws IoTDBConnectionException, StatementExecutionException {
TSCreateMultiTimeseriesReq request =
genTSCreateMultiTimeseriesReq(
paths,
dataTypes,
encodings,
compressors,
propsList,
tagsList,
attributesList,
measurementAliasList);
defaultSessionConnection.createMultiTimeseries(request);
}
private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(
List<String> paths,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
List<Map<String, String>> propsList,
List<Map<String, String>> tagsList,
List<Map<String, String>> attributesList,
List<String> measurementAliasList) {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setPaths(paths);
List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}
request.setEncodings(encodingOrdinals);
List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
for (CompressionType compression : compressors) {
compressionOrdinals.add((int) compression.serialize());
}
request.setCompressors(compressionOrdinals);
request.setPropsList(propsList);
request.setTagsList(tagsList);
request.setAttributesList(attributesList);
request.setMeasurementAliasList(measurementAliasList);
return request;
}
@Override
public boolean checkTimeseriesExists(String path)
throws IoTDBConnectionException, StatementExecutionException {
return defaultSessionConnection.checkTimeseriesExists(path, queryTimeoutInMs);
}
@Override
public void setQueryTimeout(long timeoutInMs) {
this.queryTimeoutInMs = timeoutInMs;
}
@Override
public long getQueryTimeout() {
return queryTimeoutInMs;
}
/**
* execute query sql
*
* @param sql query statement
* @return result set
*/
@Override
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
return executeStatementMayRedirect(sql, queryTimeoutInMs);
}
/**
* execute query sql with explicit timeout
*
* @param sql query statement
* @param timeoutInMs the timeout of this query, in milliseconds
* @return result set
*/
@Override
public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
return executeStatementMayRedirect(sql, timeoutInMs);
}
/**
* execute the query, may redirect query to other node.
*
* @param sql the query statement
* @param timeoutInMs time in ms
* @return data set
* @throws StatementExecutionException statement is not right
* @throws IoTDBConnectionException the network is not good
*/
private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeQueryStatement(sql, queryTimeoutInMs);
} catch (RedirectException redirectException) {
logger.error("{} redirect twice", sql, redirectException);
throw new StatementExecutionException(sql + " redirect twice, please try again.");
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
/**
* execute non query statement
*
* @param sql non query statement
*/
@Override
public void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.executeNonQueryStatement(sql);
}
/**
* query eg. select * from paths where time >= startTime and time < endTime time interval include
* startTime and exclude endTime
*
* @param paths series path
* @param startTime included
* @param endTime excluded
* @return data set
* @throws StatementExecutionException statement is not right
* @throws IoTDBConnectionException the network is not good
*/
@Override
public SessionDataSet executeRawDataQuery(
List<String> paths, long startTime, long endTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
@Override
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
return executeRawDataQuery(paths, startTime, endTime, queryTimeoutInMs);
}
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime)
throws StatementExecutionException, IoTDBConnectionException {
return executeLastDataQuery(paths, lastTime, queryTimeoutInMs);
}
/**
* query e.g. select last data from paths where time >= lastTime
*
* @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
* @param lastTime get the last data, whose timestamp is greater than or equal lastTime e.g.
* 1621326244168
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeLastDataQuery(paths, lastTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeLastDataQuery(paths, lastTime, timeOut);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
/**
* query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + <suffixPath> = <TimeSeries>
*
* @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException {
long time = 0L;
return executeLastDataQuery(paths, time, queryTimeoutInMs);
}
@Override
public SessionDataSet executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
throws StatementExecutionException, IoTDBConnectionException {
Pair<SessionDataSet, TEndPoint> pair;
try {
pair =
getSessionConnection(device)
.executeLastDataQueryForOneDevice(
db, device, sensors, isLegalPathNodes, queryTimeoutInMs);
if (pair.right != null) {
handleRedirection(device, pair.right);
}
return pair.left;
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(device) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(device));
deviceIdToEndpoint.remove(device);
// reconnect with default connection
return defaultSessionConnection.executeLastDataQueryForOneDevice(
db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
.left;
} else {
throw e;
}
}
}
@Override
public SessionDataSet executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
@Override
public SessionDataSet executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
@Override
public SessionDataSet executeAggregationQuery(
List<String> paths,
List<TAggregationType> aggregations,
long startTime,
long endTime,
long interval)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
@Override
public SessionDataSet executeAggregationQuery(
List<String> paths,
List<TAggregationType> aggregations,
long startTime,
long endTime,
long interval,
long slidingStep)
throws StatementExecutionException, IoTDBConnectionException {
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval, slidingStep);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
return defaultSessionConnection.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval, slidingStep);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
}
} else {
throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
}
}
}
/**
* insert data in one row, if you want to improve your performance, please use insertRecords
* method or insertTablet method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
Object... values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request;
try {
request =
filterAndGenTSInsertRecordReq(
deviceId, time, measurements, types, Arrays.asList(values), false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
insertRecord(deviceId, request);
}
private void insertRecord(String prefixPath, TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(prefixPath).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(prefixPath, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(prefixPath) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(prefixPath));
deviceIdToEndpoint.remove(prefixPath);
// reconnect with default connection
try {
defaultSessionConnection.insertRecord(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
private void insertRecord(String deviceId, TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
deviceIdToEndpoint.remove(deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertRecord(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
private SessionConnection getSessionConnection(String deviceId) {
TEndPoint endPoint;
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null
&& endPointToSessionConnection.containsKey(endPoint)) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
}
}
@Override
public String getTimestampPrecision() throws TException {
return defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
}
// TODO https://issues.apache.org/jira/browse/IOTDB-1399
public void removeBrokenSessionConnection(SessionConnection sessionConnection) {
// remove the cached broken leader session
if (enableRedirection) {
TEndPoint endPoint = null;
if (endPointToSessionConnection != null) {
for (Iterator<Entry<TEndPoint, SessionConnection>> it =
endPointToSessionConnection.entrySet().iterator();
it.hasNext(); ) {
Entry<TEndPoint, SessionConnection> entry = it.next();
if (entry.getValue().equals(sessionConnection)) {
endPoint = entry.getKey();
it.remove();
break;
}
}
}
if (deviceIdToEndpoint != null) {
for (Iterator<Entry<String, TEndPoint>> it = deviceIdToEndpoint.entrySet().iterator();
it.hasNext(); ) {
Entry<String, TEndPoint> entry = it.next();
if (entry.getValue().equals(endPoint)) {
it.remove();
}
}
}
}
}
private void handleRedirection(String deviceId, TEndPoint endpoint) {
if (enableRedirection) {
// no need to redirection
if (endpoint.ip.equals("0.0.0.0")) {
return;
}
AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
if (!deviceIdToEndpoint.containsKey(deviceId)
|| !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
deviceIdToEndpoint.put(deviceId, endpoint);
}
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endpoint,
k -> {
try {
return constructSessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
deviceIdToEndpoint.remove(deviceId);
}
}
}
private void handleQueryRedirection(TEndPoint endPoint) throws IoTDBConnectionException {
if (enableQueryRedirection) {
AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endPoint,
k -> {
try {
SessionConnection sessionConnection =
constructSessionConnection(this, endPoint, zoneId);
sessionConnection.setEnableRedirect(enableQueryRedirection);
return sessionConnection;
} catch (IoTDBConnectionException ex) {
exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
throw new IoTDBConnectionException(exceptionReference.get());
}
defaultSessionConnection = connection;
}
}
/**
* insert data in one row, if you want improve your performance, please use insertRecords method
* or insertTablet method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
// not vector by default
TSInsertRecordReq request;
try {
request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
insertRecord(deviceId, request);
}
/**
* insert aligned data in one row, if you want improve your performance, please use
* insertAlignedRecords method or insertTablet method.
*
* @see Session#insertAlignedRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request;
try {
request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
insertRecord(deviceId, request);
}
private TSInsertRecordReq filterAndGenTSInsertRecordReq(
String prefixPath,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values,
boolean isAligned)
throws IoTDBConnectionException {
if (hasNull(values)) {
measurements = new ArrayList<>(measurements);
values = new ArrayList<>(values);
types = new ArrayList<>(types);
boolean isAllValuesNull =
filterNullValueAndMeasurement(prefixPath, measurements, types, values);
if (isAllValuesNull) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
return genTSInsertRecordReq(prefixPath, time, measurements, types, values, isAligned);
}
private TSInsertRecordReq genTSInsertRecordReq(
String prefixPath,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values,
boolean isAligned)
throws IoTDBConnectionException {
TSInsertRecordReq request = new TSInsertRecordReq();
request.setPrefixPath(prefixPath);
request.setTimestamp(time);
request.setMeasurements(measurements);
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
request.setValues(buffer);
request.setIsAligned(isAligned);
return request;
}
/**
* insert data in one row, if you want improve your performance, please use insertRecords method
* or insertTablet method
*
* @see Session#insertRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request;
try {
request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
insertRecord(deviceId, request);
}
/**
* insert aligned data in one row, if you want improve your performance, please use
* insertAlignedRecords method or insertTablet method.
*
* @see Session#insertAlignedRecords(List, List, List, List, List)
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request;
try {
request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
insertRecord(deviceId, request);
}
private TSInsertStringRecordReq filterAndGenTSInsertStringRecordReq(
String prefixPath,
long time,
List<String> measurements,
List<String> values,
boolean isAligned) {
if (hasNull(values)) {
measurements = new ArrayList<>(measurements);
values = new ArrayList<>(values);
boolean isAllValueNull =
filterNullValueAndMeasurementWithStringType(values, prefixPath, measurements);
if (isAllValueNull) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
return genTSInsertStringRecordReq(prefixPath, time, measurements, values, isAligned);
}
private TSInsertStringRecordReq genTSInsertStringRecordReq(
String prefixPath,
long time,
List<String> measurements,
List<String> values,
boolean isAligned) {
TSInsertStringRecordReq request = new TSInsertStringRecordReq();
request.setPrefixPath(prefixPath);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(values);
request.setIsAligned(isAligned);
return request;
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (enableRedirection) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
} else {
TSInsertStringRecordsReq request;
try {
request =
filterAndGenTSInsertStringRecordsReq(
deviceIds, times, measurementsList, valuesList, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
}
}
}
/**
* When the value is null,filter this,don't use this measurement.
*
* @param times
* @param measurementsList
* @param valuesList
* @param typesList
*/
private void filterNullValueAndMeasurement(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<Object>> valuesList,
List<List<TSDataType>> typesList) {
for (int i = valuesList.size() - 1; i >= 0; i--) {
List<Object> values = valuesList.get(i);
List<String> measurements = measurementsList.get(i);
List<TSDataType> types = typesList.get(i);
boolean isAllValuesNull =
filterNullValueAndMeasurement(deviceIds.get(i), measurements, types, values);
if (isAllValuesNull) {
valuesList.remove(i);
measurementsList.remove(i);
deviceIds.remove(i);
times.remove(i);
typesList.remove(i);
}
}
if (valuesList.isEmpty()) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
/**
* Filter the null value of list。
*
* @param deviceId
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
private void filterNullValueAndMeasurementOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList) {
for (int i = valuesList.size() - 1; i >= 0; i--) {
List<Object> values = valuesList.get(i);
List<String> measurements = measurementsList.get(i);
List<TSDataType> types = typesList.get(i);
boolean isAllValuesNull =
filterNullValueAndMeasurement(deviceId, measurements, types, values);
if (isAllValuesNull) {
valuesList.remove(i);
measurementsList.remove(i);
typesList.remove(i);
times.remove(i);
}
}
if (valuesList.isEmpty()) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
/**
* Filter the null value of list。
*
* @param times
* @param deviceId
* @param measurementsList
* @param valuesList
*/
private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
List<Long> times,
String deviceId,
List<List<String>> measurementsList,
List<List<String>> valuesList) {
for (int i = valuesList.size() - 1; i >= 0; i--) {
List<String> values = valuesList.get(i);
List<String> measurements = measurementsList.get(i);
boolean isAllValuesNull =
filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
if (isAllValuesNull) {
valuesList.remove(i);
measurementsList.remove(i);
times.remove(i);
}
}
if (valuesList.isEmpty()) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
/**
* Filter the null object of list。
*
* @param deviceId
* @param measurementsList
* @param types
* @param valuesList
* @return true:all value is null;false:not all null value is null.
*/
private boolean filterNullValueAndMeasurement(
String deviceId,
List<String> measurementsList,
List<TSDataType> types,
List<Object> valuesList) {
Map<String, Object> nullMap = new HashMap<>();
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
nullMap.put(measurementsList.get(i), valuesList.get(i));
valuesList.remove(i);
measurementsList.remove(i);
types.remove(i);
}
}
if (valuesList.isEmpty()) {
logger.info("All values of the {} are null,null values are {}", deviceId, nullMap);
return true;
} else {
logger.info("Some values of {} are null,null values are {}", deviceId, nullMap);
}
return false;
}
/**
* Filter the null object of list。
*
* @param prefixPaths devices path。
* @param times
* @param measurementsList
* @param valuesList
* @return true:all values of valuesList are null;false:Not all values of valuesList are null.
*/
private void filterNullValueAndMeasurementWithStringType(
List<String> prefixPaths,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList) {
for (int i = valuesList.size() - 1; i >= 0; i--) {
List<String> values = valuesList.get(i);
List<String> measurements = measurementsList.get(i);
boolean isAllValueNull =
filterNullValueAndMeasurementWithStringType(values, prefixPaths.get(i), measurements);
if (isAllValueNull) {
valuesList.remove(i);
measurementsList.remove(i);
times.remove(i);
prefixPaths.remove(i);
}
}
if (valuesList.isEmpty()) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
/**
* When the value is null,filter this,don't use this measurement.
*
* @param valuesList
* @param measurementsList
* @return true:all value is null;false:not all null value is null.
*/
private boolean filterNullValueAndMeasurementWithStringType(
List<String> valuesList, String deviceId, List<String> measurementsList) {
Map<String, Object> nullMap = new HashMap<>();
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
nullMap.put(measurementsList.get(i), valuesList.get(i));
valuesList.remove(i);
measurementsList.remove(i);
}
}
if (valuesList.isEmpty()) {
logger.info("All values of the {} are null,null values are {}", deviceId, nullMap);
return true;
} else {
logger.info("Some values of {} are null,null values are {}", deviceId, nullMap);
}
return false;
}
private boolean hasNull(List valuesList) {
boolean haveNull = false;
for (int i1 = 0; i1 < valuesList.size(); i1++) {
Object o = valuesList.get(i1);
if (o instanceof List) {
List o1 = (List) o;
if (hasNull(o1)) {
haveNull = true;
break;
}
} else {
if (o == null) {
haveNull = true;
break;
}
}
}
return haveNull;
}
/**
* Insert aligned multiple rows, which can reduce the overhead of network. This method is just
* like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
* want improve your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different prefixPath, time, number of
* subMeasurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
}
if (enableRedirection) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
} else {
TSInsertStringRecordsReq request;
try {
request =
filterAndGenTSInsertStringRecordsReq(
deviceIds, times, measurementsList, valuesList, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
}
}
}
private void insertStringRecordsWithLeaderCache(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList,
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
final SessionConnection connection = getSessionConnection(deviceIds.get(i));
TSInsertStringRecordsReq request =
recordsGroup.getOrDefault(connection, new TSInsertStringRecordsReq());
request.setIsAligned(isAligned);
try {
filterAndUpdateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
recordsGroup.putIfAbsent(connection, request);
} catch (NoValidValueException e) {
logger.warn(
ALL_VALUES_ARE_NULL,
deviceIds.get(i),
times.get(i),
measurementsList.get(i).toString());
}
}
insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertStringRecordsReq filterAndGenTSInsertStringRecordsReq(
List<String> prefixPaths,
List<Long> time,
List<List<String>> measurements,
List<List<String>> values,
boolean isAligned) {
if (hasNull(values)) {
values = changeToArrayListWithStringType(values);
measurements = changeToArrayListWithStringType(measurements);
prefixPaths = new ArrayList<>(prefixPaths);
time = new ArrayList<>(time);
filterNullValueAndMeasurementWithStringType(prefixPaths, time, measurements, values);
}
return genTSInsertStringRecordsReq(prefixPaths, time, measurements, values, isAligned);
}
private List<List<String>> changeToArrayListWithStringType(List<List<String>> values) {
if (!(values instanceof ArrayList)) {
values = new ArrayList<>(values);
}
for (int i = 0; i < values.size(); i++) {
List<String> currentValue = values.get(i);
if (!(currentValue instanceof ArrayList)) {
values.set(i, new ArrayList<>(currentValue));
}
}
return values;
}
private List<List<Object>> changeToArrayList(List<List<Object>> values) {
if (!(values instanceof ArrayList)) {
values = new ArrayList<>(values);
}
for (int i = 0; i < values.size(); i++) {
List<Object> currentValue = values.get(i);
if (!(currentValue instanceof ArrayList)) {
values.set(i, new ArrayList<>(currentValue));
}
}
return values;
}
private List<List<TSDataType>> changeToArrayListWithTSDataType(List<List<TSDataType>> values) {
if (!(values instanceof ArrayList)) {
values = new ArrayList<>(values);
}
for (int i = 0; i < values.size(); i++) {
List<TSDataType> currentValue = values.get(i);
if (!(currentValue instanceof ArrayList)) {
values.set(i, new ArrayList<>(currentValue));
}
}
return values;
}
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
List<String> prefixPaths,
List<Long> time,
List<List<String>> measurements,
List<List<String>> values,
boolean isAligned) {
TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
request.setPrefixPaths(prefixPaths);
request.setTimestamps(time);
request.setMeasurementsList(measurements);
request.setValuesList(values);
request.setIsAligned(isAligned);
return request;
}
private void filterAndUpdateTSInsertStringRecordsReq(
TSInsertStringRecordsReq request,
String deviceId,
long time,
List<String> measurements,
List<String> values) {
if (hasNull(values)) {
measurements = new ArrayList<>(measurements);
values = new ArrayList<>(values);
boolean isAllValueNull =
filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
if (isAllValueNull) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
updateTSInsertStringRecordsReq(request, deviceId, time, measurements, values);
}
private void updateTSInsertStringRecordsReq(
TSInsertStringRecordsReq request,
String deviceId,
long time,
List<String> measurements,
List<String> values) {
request.addToPrefixPaths(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
request.addToValuesList(values);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
// judge if convert records to tablets.
if (enableRecordsAutoConvertTablet && len >= MIN_RECORDS_SIZE) {
Set<String> deviceSet = new HashSet<>(deviceIds);
if ((double) deviceSet.size() / deviceIds.size() <= CONVERT_THRESHOLD
&& judgeConvertOfMultiDevice(deviceIds, measurementsList)) {
convertToTabletsAndInsert(
deviceIds, times, measurementsList, typesList, valuesList, deviceSet.size(), false);
return;
}
}
// insert records
if (enableRedirection) {
insertRecordsWithLeaderCache(
deviceIds, times, measurementsList, typesList, valuesList, false);
} else {
TSInsertRecordsReq request;
try {
request =
filterAndGenTSInsertRecordsReq(
deviceIds, times, measurementsList, typesList, valuesList, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
}
}
}
/**
* Insert aligned multiple rows, which can reduce the overhead of network. This method is just
* like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
* want improve your performance, please see insertTablet method
*
* <p>Each row is independent, which could have different prefixPath, time, number of
* subMeasurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
}
// judge if convert records to tablets.
if (enableRecordsAutoConvertTablet && len >= MIN_RECORDS_SIZE) {
Set<String> deviceSet = new HashSet<>(deviceIds);
if ((double) deviceSet.size() / deviceIds.size() <= CONVERT_THRESHOLD
&& judgeConvertOfMultiDevice(deviceIds, measurementsList)) {
convertToTabletsAndInsert(
deviceIds, times, measurementsList, typesList, valuesList, deviceSet.size(), true);
return;
}
}
if (enableRedirection) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
} else {
TSInsertRecordsReq request;
try {
request =
filterAndGenTSInsertRecordsReq(
deviceIds, times, measurementsList, typesList, valuesList, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
}
}
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row could have same deviceId but different time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
* your performance, please see insertTablet method
*
* <p>Each row could have same deviceId but different time, number of measurements
*
* @param haveSorted deprecated, whether the times have been sorted
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
if (enableRecordsAutoConvertTablet
&& len >= MIN_RECORDS_SIZE
&& judgeConvertOfOneDevice(measurementsList)) {
convertToTabletAndInsert(deviceId, times, measurementsList, typesList, valuesList, false);
return;
}
TSInsertRecordsOfOneDeviceReq request;
try {
request =
filterAndGenTSInsertRecordsOfOneDeviceReq(
deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
return;
}
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
deviceIdToEndpoint.remove(deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertRecordsOfOneDevice(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
/**
* Insert multiple rows with String format data, which can reduce the overhead of network. This
* method is just like jdbc executeBatch, we pack some insert request in batch and send them to
* server. If you want improve your performance, please see insertTablet method
*
* <p>Each row could have same deviceId but different time, number of measurements, number of
* values as String
*
* @param haveSorted deprecated, whether the times have been sorted
*/
@Override
public void insertStringRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
TSInsertStringRecordsOfOneDeviceReq req;
try {
req =
filterAndGenTSInsertStringRecordsOfOneDeviceReq(
deviceId, times, measurementsList, valuesList, haveSorted, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
return;
}
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
deviceIdToEndpoint.remove(deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertStringRecordsOfOneDevice(req);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
/**
* Insert multiple rows with String format data, which can reduce the overhead of network. This
* method is just like jdbc executeBatch, we pack some insert request in batch and send them to
* server. If you want improve your performance, please see insertTablet method
*
* <p>Each row could have same deviceId but different time, number of measurements, number of
* values as String
*/
@Override
public void insertStringRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
}
/**
* Insert aligned multiple rows, which can reduce the overhead of network. This method is just
* like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
* want improve your performance, please see insertTablet method
*
* <p>Each row could have same prefixPath but different time, number of measurements
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertAlignedRecordsOfOneDevice(
deviceId, times, measurementsList, typesList, valuesList, false);
}
/**
* Insert aligned multiple rows, which can reduce the overhead of network. This method is just
* like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
* want improve your performance, please see insertTablet method
*
* <p>Each row could have same prefixPath but different time, number of measurements
*
* @param haveSorted deprecated, whether the times have been sorted
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"times, subMeasurementsList and valuesList's size should be equal");
}
if (enableRecordsAutoConvertTablet
&& len >= MIN_RECORDS_SIZE
&& judgeConvertOfOneDevice(measurementsList)) {
convertToTabletAndInsert(deviceId, times, measurementsList, typesList, valuesList, true);
return;
}
TSInsertRecordsOfOneDeviceReq request;
try {
request =
filterAndGenTSInsertRecordsOfOneDeviceReq(
deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
return;
}
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
deviceIdToEndpoint.remove(deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertRecordsOfOneDevice(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
/**
* Insert multiple rows with String format data, which can reduce the overhead of network. This
* method is just like jdbc executeBatch, we pack some insert request in batch and send them to
* server. If you want improve your performance, please see insertTablet method
*
* <p>Each row could have same deviceId but different time, number of measurements, number of
* values as String
*
* @param haveSorted deprecated, whether the times have been sorted
*/
@Override
public void insertAlignedStringRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
TSInsertStringRecordsOfOneDeviceReq req;
try {
req =
filterAndGenTSInsertStringRecordsOfOneDeviceReq(
deviceId, times, measurementsList, valuesList, haveSorted, true);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
return;
}
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
deviceIdToEndpoint.remove(deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertStringRecordsOfOneDevice(req);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
/**
* Insert aligned multiple rows with String format data, which can reduce the overhead of network.
* This method is just like jdbc executeBatch, we pack some insert request in batch and send them
* to server. If you want improve your performance, please see insertTablet method
*
* <p>Each row could have same prefixPath but different time, number of measurements, number of
* values as String
*
* @see Session#insertTablet(Tablet)
*/
@Override
public void insertAlignedStringRecordsOfOneDevice(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
insertAlignedStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
}
private TSInsertRecordsOfOneDeviceReq filterAndGenTSInsertRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted,
boolean isAligned)
throws IoTDBConnectionException {
if (hasNull(valuesList)) {
measurementsList = changeToArrayListWithStringType(measurementsList);
valuesList = changeToArrayList(valuesList);
typesList = changeToArrayListWithTSDataType(typesList);
times = new ArrayList<>(times);
filterNullValueAndMeasurementOfOneDevice(
prefixPath, times, measurementsList, typesList, valuesList);
}
return genTSInsertRecordsOfOneDeviceReq(
prefixPath, times, measurementsList, typesList, valuesList, haveSorted, isAligned);
}
private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean haveSorted,
boolean isAligned)
throws IoTDBConnectionException {
// check params size
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
if (!checkSorted(times)) {
// sort
Integer[] index = new Integer[times.size()];
for (int i = 0; i < times.size(); i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(times::get));
times.sort(Long::compareTo);
// sort measurementList
measurementsList = sortList(measurementsList, index);
// sort typesList
typesList = sortList(typesList, index);
// sort values
valuesList = sortList(valuesList, index);
}
TSInsertRecordsOfOneDeviceReq request = new TSInsertRecordsOfOneDeviceReq();
request.setPrefixPath(prefixPath);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
request.setIsAligned(isAligned);
return request;
}
private TSInsertStringRecordsOfOneDeviceReq filterAndGenTSInsertStringRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList,
boolean haveSorted,
boolean isAligned) {
if (hasNull(valuesList)) {
measurementsList = changeToArrayListWithStringType(measurementsList);
valuesList = changeToArrayListWithStringType(valuesList);
times = new ArrayList<>(times);
filterNullValueAndMeasurementWithStringTypeOfOneDevice(
times, prefixPath, measurementsList, valuesList);
}
return genTSInsertStringRecordsOfOneDeviceReq(
prefixPath, times, measurementsList, valuesList, haveSorted, isAligned);
}
private TSInsertStringRecordsOfOneDeviceReq genTSInsertStringRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList,
boolean haveSorted,
boolean isAligned) {
// check params size
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
if (!checkSorted(times)) {
Integer[] index = new Integer[times.size()];
for (int i = 0; i < index.length; i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(times::get));
times.sort(Long::compareTo);
// sort measurementsList
measurementsList = sortList(measurementsList, index);
// sort valuesList
valuesList = sortList(valuesList, index);
}
TSInsertStringRecordsOfOneDeviceReq req = new TSInsertStringRecordsOfOneDeviceReq();
req.setPrefixPath(prefixPath);
req.setTimestamps(times);
req.setMeasurementsList(measurementsList);
req.setValuesList(valuesList);
req.setIsAligned(isAligned);
return req;
}
/**
* Sort the input source list.
*
* <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
*
* @param source Input list
* @param index retuen order
* @param <T> Input type
* @return ordered list
*/
private static <T> List<T> sortList(List<T> source, Integer[] index) {
return Arrays.stream(index).map(source::get).collect(Collectors.toList());
}
private List<ByteBuffer> objectValuesListToByteBufferList(
List<List<Object>> valuesList, List<List<TSDataType>> typesList)
throws IoTDBConnectionException {
List<ByteBuffer> buffersList = new ArrayList<>();
for (int i = 0; i < valuesList.size(); i++) {
ByteBuffer buffer = SessionUtils.getValueBuffer(typesList.get(i), valuesList.get(i));
buffersList.add(buffer);
}
return buffersList;
}
private void insertRecordsWithLeaderCache(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
final SessionConnection connection = getSessionConnection(deviceIds.get(i));
TSInsertRecordsReq request = recordsGroup.getOrDefault(connection, new TSInsertRecordsReq());
request.setIsAligned(isAligned);
try {
filterAndUpdateTSInsertRecordsReq(
request,
deviceIds.get(i),
times.get(i),
measurementsList.get(i),
typesList.get(i),
valuesList.get(i));
recordsGroup.putIfAbsent(connection, request);
} catch (NoValidValueException e) {
logger.warn(
"All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]",
deviceIds.get(i),
times.get(i),
measurementsList.get(i));
}
}
insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean isAligned)
throws IoTDBConnectionException {
if (hasNull(valuesList)) {
measurementsList = changeToArrayListWithStringType(measurementsList);
valuesList = changeToArrayList(valuesList);
deviceIds = new ArrayList<>(deviceIds);
times = new ArrayList<>(times);
typesList = changeToArrayListWithTSDataType(typesList);
filterNullValueAndMeasurement(deviceIds, times, measurementsList, valuesList, typesList);
}
return genTSInsertRecordsReq(
deviceIds, times, measurementsList, typesList, valuesList, isAligned);
}
private TSInsertRecordsReq genTSInsertRecordsReq(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean isAligned)
throws IoTDBConnectionException {
TSInsertRecordsReq request = new TSInsertRecordsReq();
request.setPrefixPaths(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
request.setIsAligned(isAligned);
List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
private void filterAndUpdateTSInsertRecordsReq(
TSInsertRecordsReq request,
String deviceId,
Long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException {
if (hasNull(values)) {
measurements = new ArrayList<>(measurements);
types = new ArrayList<>(types);
values = new ArrayList<>(values);
boolean isAllValuesNull =
filterNullValueAndMeasurement(deviceId, measurements, types, values);
if (isAllValuesNull) {
throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
}
}
updateTSInsertRecordsReq(request, deviceId, time, measurements, types, values);
}
private void updateTSInsertRecordsReq(
TSInsertRecordsReq request,
String deviceId,
Long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException {
request.addToPrefixPaths(deviceId);
request.addToTimestamps(time);
request.addToMeasurementsList(measurements);
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
request.addToValuesList(buffer);
}
/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
*
* <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
*
* <p>times in Tablet may be not in ascending order
*
* @param tablet data batch
*/
@Override
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
insertTablet(tablet, false);
}
/**
* insert a Tablet
*
* @param tablet data batch
* @param sorted deprecated, whether times in Tablet are in ascending order
*/
@Override
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
try {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(tablet.deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(tablet.deviceId));
deviceIdToEndpoint.remove(tablet.deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertTablet(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
/**
* insert the aligned timeseries data of a device. For each timestamp, the number of measurements
* is the same.
*
* <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
*
* <p>times in Tablet may be not in ascending order
*
* @param tablet data batch
*/
@Override
public void insertAlignedTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
insertAlignedTablet(tablet, false);
}
/**
* insert the aligned timeseries data of a device.
*
* @param tablet data batch
* @param sorted deprecated, whether times in Tablet are in ascending order
*/
@Override
public void insertAlignedTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, true);
try {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(tablet.deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(tablet.deviceId));
deviceIdToEndpoint.remove(tablet.deviceId);
// reconnect with default connection
try {
defaultSessionConnection.insertTablet(request);
} catch (RedirectException ignored) {
}
} else {
throw e;
}
}
}
private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted, boolean isAligned) {
if (!checkSorted(tablet)) {
sortTablet(tablet);
}
TSInsertTabletReq request = new TSInsertTabletReq();
for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
}
request.setPrefixPath(tablet.deviceId);
request.setIsAligned(isAligned);
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
request.setValues(SessionUtils.getValueBuffer(tablet));
request.setSize(tablet.rowSize);
return request;
}
/**
* insert the data of several deivces. Given a deivce, for each timestamp, the number of
* measurements is the same.
*
* <p>Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
*/
@Override
public void insertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
insertTablets(tablets, false);
}
/**
* insert the data of several devices. Given a device, for each timestamp, the number of
* measurements is the same.
*
* @param tablets data batch in multiple device
* @param sorted deprecated, whether times in each Tablet are in ascending order
*/
@Override
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
if (enableRedirection) {
insertTabletsWithLeaderCache(tablets, sorted, false);
} else {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException ignored) {
}
}
}
/**
* insert aligned data of several deivces. Given a deivce, for each timestamp, the number of
* measurements is the same.
*
* <p>Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
*/
@Override
public void insertAlignedTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
insertAlignedTablets(tablets, false);
}
/**
* insert aligned data of several devices. Given a device, for each timestamp, the number of
* measurements is the same.
*
* @param tablets data batch in multiple device
* @param sorted deprecated, whether times in each Tablet are in ascending order
*/
@Override
public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
if (enableRedirection) {
insertTabletsWithLeaderCache(tablets, sorted, true);
} else {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, true);
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException ignored) {
}
}
}
private void insertTabletsWithLeaderCache(
Map<String, Tablet> tablets, boolean sorted, boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
final SessionConnection connection = getSessionConnection(entry.getKey());
TSInsertTabletsReq request =
tabletGroup.computeIfAbsent(connection, k -> new TSInsertTabletsReq());
updateTSInsertTabletsReq(request, entry.getValue(), sorted, isAligned);
}
insertByGroup(tabletGroup, SessionConnection::insertTablets);
}
private TSInsertTabletsReq genTSInsertTabletsReq(
List<Tablet> tablets, boolean sorted, boolean isAligned) throws BatchExecutionException {
TSInsertTabletsReq request = new TSInsertTabletsReq();
if (tablets.isEmpty()) {
throw new BatchExecutionException("No tablet is inserting!");
}
for (Tablet tablet : tablets) {
updateTSInsertTabletsReq(request, tablet, sorted, isAligned);
}
return request;
}
private void updateTSInsertTabletsReq(
TSInsertTabletsReq request, Tablet tablet, boolean sorted, boolean isAligned) {
if (!checkSorted(tablet)) {
sortTablet(tablet);
}
request.addToPrefixPaths(tablet.deviceId);
List<String> measurements = new ArrayList<>();
List<Integer> dataTypes = new ArrayList<>();
request.setIsAligned(isAligned);
for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
measurements.add(measurementSchema.getMeasurementId());
dataTypes.add(measurementSchema.getType().ordinal());
}
request.addToMeasurementsList(measurements);
request.addToTypesList(dataTypes);
request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
request.addToValuesList(SessionUtils.getValueBuffer(tablet));
request.addToSizeList(tablet.rowSize);
}
// sample some records and judge weather need to add too many null values to convert to tablet.
private boolean judgeConvertOfOneDevice(List<List<String>> measurementsList) {
int size = measurementsList.size();
int sampleNum = (int) (size * SAMPLE_PROPORTION);
List<Integer> indexList =
ThreadLocalRandom.current()
.ints(0, size)
.distinct()
.limit(sampleNum)
.boxed()
.collect(Collectors.toList());
Set<String> allMeasurement =
new HashSet<>(measurementsList.get(indexList.get(0)).size() + 1, 1);
for (int i = 0; i < sampleNum; i++) {
allMeasurement.addAll(measurementsList.get(indexList.get(i)));
}
for (int i = 0; i < sampleNum; i++) {
if ((double) measurementsList.get(indexList.get(i)).size() / allMeasurement.size()
< CONVERT_THRESHOLD) {
return false;
}
}
return true;
}
// convert records of one device to tablet and insert
private void convertToTabletAndInsert(
String deviceId,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
// measurement -> <type,if null>
Map<String, Pair<TSDataType, Boolean>> measuremenMap =
new HashMap<>(measurementsList.get(0).size() + 1, 1);
// build measurementType
for (int rowIndex = 0; rowIndex < measurementsList.size(); rowIndex++) {
List<String> measurements = measurementsList.get(rowIndex);
List<TSDataType> types = typesList.get(rowIndex);
for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
String measurement = measurements.get(colIndex);
if (!measuremenMap.containsKey(measurement)) {
measuremenMap.put(measurement, new Pair<>(types.get(colIndex), true));
}
}
}
List<MeasurementSchema> schemaList = new ArrayList<>(measuremenMap.size());
// use measurementType to build schemaList
for (Entry<String, Pair<TSDataType, Boolean>> entry : measuremenMap.entrySet()) {
schemaList.add(new MeasurementSchema(entry.getKey(), entry.getValue().getLeft()));
}
// build tablet and insert
Tablet tablet = new Tablet(deviceId, schemaList, times.size());
for (int rowIndex = 0; rowIndex < times.size(); rowIndex++) {
addRecordToTablet(
tablet,
times.get(rowIndex),
measurementsList.get(rowIndex),
valuesList.get(rowIndex),
measuremenMap);
}
if (isAligned) {
insertAlignedTablet(tablet);
} else {
insertTablet(tablet);
}
}
// sample some records and judge weather need to add too many null values to convert to tablet.
private boolean judgeConvertOfMultiDevice(
List<String> deviceIds, List<List<String>> measurementsList) {
int size = deviceIds.size();
int sampleNum = (int) (size * SAMPLE_PROPORTION);
Map<String, Set<String>> measurementMap = new HashMap<>(sampleNum + 1, 1);
List<Integer> indexList =
ThreadLocalRandom.current()
.ints(0, size)
.distinct()
.limit(sampleNum)
.boxed()
.collect(Collectors.toList());
for (int i = 0; i < sampleNum; i++) {
int index = indexList.get(i);
List<String> measurements = measurementsList.get(index);
Set<String> allMeasurement =
measurementMap.computeIfAbsent(
deviceIds.get(index), k -> new HashSet<>(measurements.size() + 1, 1));
allMeasurement.addAll(measurements);
}
for (int i = 0; i < sampleNum; i++) {
int index = indexList.get(i);
Set<String> allMeasurement = measurementMap.get(deviceIds.get(index));
if ((double) measurementsList.get(index).size() / allMeasurement.size() < CONVERT_THRESHOLD) {
return false;
}
}
return true;
}
// convert records of multiple devices to tablets and insert
private void convertToTabletsAndInsert(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList,
int deviceSize,
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
// device -> measurement -> <type,if null>
Map<String, Map<String, Pair<TSDataType, Boolean>>> deviceMeasuremenMap =
new HashMap<>(deviceSize + 1, 1);
// device -> row count
Map<String, Integer> rowMap = new HashMap<>(deviceSize + 1, 1);
// first build measurementTypeMap and rowMap
for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
String device = deviceIds.get(rowIndex);
List<String> measurements = measurementsList.get(rowIndex);
List<TSDataType> types = typesList.get(rowIndex);
Map<String, Pair<TSDataType, Boolean>> measurementMap =
deviceMeasuremenMap.computeIfAbsent(
device, k -> new HashMap<>(measurements.size() + 1, 1));
for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
String measurement = measurements.get(colIndex);
if (!measurementMap.containsKey(measurement)) {
measurementMap.put(measurement, new Pair<>(types.get(colIndex), true));
}
}
rowMap.merge(device, 1, Integer::sum);
}
// device -> schema
Map<String, List<MeasurementSchema>> schemaMap = new HashMap<>(deviceSize + 1, 1);
// use measurementTypeMap to build schemaMap
for (Map.Entry<String, Map<String, Pair<TSDataType, Boolean>>> entry :
deviceMeasuremenMap.entrySet()) {
List<MeasurementSchema> schemaList = new ArrayList<>(entry.getValue().size() + 1);
for (Map.Entry<String, Pair<TSDataType, Boolean>> schemaEntry : entry.getValue().entrySet()) {
schemaList.add(
new MeasurementSchema(schemaEntry.getKey(), schemaEntry.getValue().getLeft()));
}
schemaMap.put(entry.getKey(), schemaList);
}
// device -> tablet
Map<String, Tablet> tablets = new HashMap<>(deviceSize + 1, 1);
// use schemaMap and rowMap to build tablets and insert
for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
String device = deviceIds.get(rowIndex);
Tablet tablet =
tablets.computeIfAbsent(
device, k -> new Tablet(device, schemaMap.get(device), rowMap.get(device)));
addRecordToTablet(
tablet,
times.get(rowIndex),
measurementsList.get(rowIndex),
valuesList.get(rowIndex),
deviceMeasuremenMap.get(device));
}
if (isAligned) {
insertAlignedTablets(tablets);
} else {
insertTablets(tablets);
}
}
// add one record to tablet.
private void addRecordToTablet(
Tablet tablet,
Long timestamp,
List<String> measurements,
List<Object> values,
Map<String, Pair<TSDataType, Boolean>> allMeasurementMap) {
int row = tablet.rowSize++;
tablet.addTimestamp(row, timestamp);
// tablet without null value
if (measurements.size() == allMeasurementMap.size()) {
for (int i = 0; i < measurements.size(); i++) {
tablet.addValue(measurements.get(i), row, values.get(i));
}
return;
}
// tablet with null value
for (int i = 0; i < measurements.size(); i++) {
String measurement = measurements.get(i);
tablet.addValue(measurement, row, values.get(i));
allMeasurementMap.get(measurement).setRight(false);
}
for (Entry<String, Pair<TSDataType, Boolean>> entry : allMeasurementMap.entrySet()) {
if (entry.getValue().getRight()) {
tablet.addValue(entry.getKey(), row, null);
} else {
entry.getValue().setRight(true);
}
}
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertTablet(Tablet tablet)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablet(tablet, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
defaultSessionConnection.testInsertTablet(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
testInsertTablets(tablets, false);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
defaultSessionConnection.testInsertTablets(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordsReq request;
try {
request =
filterAndGenTSInsertStringRecordsReq(
deviceIds, times, measurementsList, valuesList, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertRecords(
List<String> deviceIds,
List<Long> times,
List<List<String>> measurementsList,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordsReq request;
try {
request =
filterAndGenTSInsertRecordsReq(
deviceIds, times, measurementsList, typesList, valuesList, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
return;
}
defaultSessionConnection.testInsertRecords(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request;
try {
request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
defaultSessionConnection.testInsertRecord(request);
}
/**
* This method NOT insert data into database and the server just return after accept the request,
* this method should be used to test other time cost in client
*/
@Override
public void testInsertRecord(
String deviceId,
long time,
List<String> measurements,
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertRecordReq request;
try {
request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
} catch (NoValidValueException e) {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
defaultSessionConnection.testInsertRecord(request);
}
/**
* delete a timeseries, including data and schema
*
* @param path timeseries to delete, should be a whole path
*/
@Override
public void deleteTimeseries(String path)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
}
/**
* delete some timeseries, including data and schema
*
* @param paths timeseries to delete, should be a whole path
*/
@Override
public void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
defaultSessionConnection.deleteTimeseries(paths);
}
/**
* delete data <= time in one timeseries
*
* @param path data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
@Override
public void deleteData(String path, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime);
}
/**
* delete data <= time in multiple timeseries
*
* @param paths data in which time series to delete
* @param endTime data with time stamp less than or equal to time will be deleted
*/
@Override
public void deleteData(List<String> paths, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
deleteData(paths, Long.MIN_VALUE, endTime);
}
/**
* delete data >= startTime and data <= endTime in multiple timeseries
*
* @param paths data in which time series to delete
* @param startTime delete range start time
* @param endTime delete range end time
*/
@Override
public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
defaultSessionConnection.deleteData(request);
}
private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long startTime, long endTime) {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
request.setStartTime(startTime);
request.setEndTime(endTime);
return request;
}
/**
* check whether the batch has been sorted
*
* @return whether the batch has been sorted
*/
private boolean checkSorted(Tablet tablet) {
for (int i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
}
}
return true;
}
private boolean checkSorted(List<Long> times) {
for (int i = 1; i < times.size(); i++) {
if (times.get(i) < times.get(i - 1)) {
return false;
}
}
return true;
}
@SuppressWarnings({
"squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
public void sortTablet(Tablet tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
*/
// sort to get index, and use index to sort value list
Integer[] index = new Integer[tablet.rowSize];
for (int i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
int columnIndex = 0;
for (int i = 0; i < tablet.getSchemas().size(); i++) {
IMeasurementSchema schema = tablet.getSchemas().get(i);
if (schema instanceof MeasurementSchema) {
tablet.values[columnIndex] = sortList(tablet.values[columnIndex], schema.getType(), index);
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
}
columnIndex++;
} else {
int measurementSize = schema.getSubMeasurementsList().size();
for (int j = 0; j < measurementSize; j++) {
tablet.values[columnIndex] =
sortList(
tablet.values[columnIndex],
schema.getSubMeasurementsTSDataTypeList().get(j),
index);
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
}
columnIndex++;
}
}
}
}
/**
* sort value list by index
*
* @param valueList value list
* @param dataType data type
* @param index index
* @return sorted list
*/
private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
boolean[] sortedValues = new boolean[boolValues.length];
for (int i = 0; i < index.length; i++) {
sortedValues[i] = boolValues[index[i]];
}
return sortedValues;
case INT32:
int[] intValues = (int[]) valueList;
int[] sortedIntValues = new int[intValues.length];
for (int i = 0; i < index.length; i++) {
sortedIntValues[i] = intValues[index[i]];
}
return sortedIntValues;
case INT64:
long[] longValues = (long[]) valueList;
long[] sortedLongValues = new long[longValues.length];
for (int i = 0; i < index.length; i++) {
sortedLongValues[i] = longValues[index[i]];
}
return sortedLongValues;
case FLOAT:
float[] floatValues = (float[]) valueList;
float[] sortedFloatValues = new float[floatValues.length];
for (int i = 0; i < index.length; i++) {
sortedFloatValues[i] = floatValues[index[i]];
}
return sortedFloatValues;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
double[] sortedDoubleValues = new double[doubleValues.length];
for (int i = 0; i < index.length; i++) {
sortedDoubleValues[i] = doubleValues[index[i]];
}
return sortedDoubleValues;
case TEXT:
Binary[] binaryValues = (Binary[]) valueList;
Binary[] sortedBinaryValues = new Binary[binaryValues.length];
for (int i = 0; i < index.length; i++) {
sortedBinaryValues[i] = binaryValues[index[i]];
}
return sortedBinaryValues;
default:
throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
}
}
/**
* sort BitMap by index
*
* @param bitMap BitMap to be sorted
* @param index index
* @return sorted bitMap
*/
private BitMap sortBitMap(BitMap bitMap, Integer[] index) {
BitMap sortedBitMap = new BitMap(bitMap.getSize());
for (int i = 0; i < index.length; i++) {
if (bitMap.isMarked(index[i])) {
sortedBitMap.mark(i);
}
}
return sortedBitMap;
}
@Override
public void setSchemaTemplate(String templateName, String prefixPath)
throws IoTDBConnectionException, StatementExecutionException {
TSSetSchemaTemplateReq request = getTSSetSchemaTemplateReq(templateName, prefixPath);
defaultSessionConnection.setSchemaTemplate(request);
}
/**
* Construct Template at session and create it at server.
*
* <p>The template instance constructed within session is SUGGESTED to be a flat measurement
* template, which has no internal nodes inside a template.
*
* <p>For example, template(s1, s2, s3) is a flat measurement template, while template2(GPS.x,
* GPS.y, s1) is not.
*
* <p>Tree-structured template, which is contrary to flat measurement template, may not be
* supported in further version of IoTDB
*
* @see Template
*/
@Override
public void createSchemaTemplate(Template template)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSCreateSchemaTemplateReq req = new TSCreateSchemaTemplateReq();
req.setName(template.getName());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
template.serialize(baos);
req.setSerializedTemplate(baos.toByteArray());
baos.close();
defaultSessionConnection.createSchemaTemplate(req);
}
/**
* Create a template with flat measurements, not tree structured. Need to specify datatype,
* encoding and compressor of each measurement, and alignment of these measurements at once.
*
* @param measurements flat measurements of the template, cannot contain character dot
* @param dataTypes datatype of each measurement in the template
* @param encodings encodings of each measurement in the template
* @param compressors compression type of each measurement in the template
* @param isAligned specify whether these flat measurements are aligned
* @oaram templateName name of template to create
*/
@Override
public void createSchemaTemplate(
String templateName,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors,
boolean isAligned)
throws IOException, IoTDBConnectionException, StatementExecutionException {
Template temp = new Template(templateName, isAligned);
int len = measurements.size();
if (len != dataTypes.size() || len != encodings.size() || len != compressors.size()) {
throw new StatementExecutionException(
"Different length of measurements, datatypes, encodings "
+ "or compressors when create device template.");
}
for (int idx = 0; idx < measurements.size(); idx++) {
MeasurementNode mNode =
new MeasurementNode(
measurements.get(idx), dataTypes.get(idx), encodings.get(idx), compressors.get(idx));
temp.addToTemplate(mNode);
}
createSchemaTemplate(temp);
}
/**
* Compatible for rel/0.12, this method will create an unaligned flat template as a result. Notice
* that there is no aligned concept in 0.12, so only the first measurement in each nested list
* matters.
*
* @param name name of the template
* @param schemaNames it works as a virtual layer inside template in 0.12, and makes no difference
* after 0.13
* @param measurements the first measurement in each nested list will constitute the final flat
* template
* @param dataTypes the data type of each measurement, only the first one in each nested list
* matters as above
* @param encodings the encoding of each measurement, only the first one in each nested list
* matters as above
* @param compressors the compressor of each measurement
* @throws IoTDBConnectionException
* @throws StatementExecutionException
* @deprecated
*/
@Override
@Deprecated
public void createSchemaTemplate(
String name,
List<String> schemaNames,
List<List<String>> measurements,
List<List<TSDataType>> dataTypes,
List<List<TSEncoding>> encodings,
List<CompressionType> compressors)
throws IoTDBConnectionException, StatementExecutionException {
List<String> flatMeasurements = new ArrayList<>();
List<TSDataType> flatDataTypes = new ArrayList<>();
List<TSEncoding> flatEncodings = new ArrayList<>();
for (int idx = 0; idx < measurements.size(); idx++) {
flatMeasurements.add(measurements.get(idx).get(0));
flatDataTypes.add(dataTypes.get(idx).get(0));
flatEncodings.add(encodings.get(idx).get(0));
}
try {
createSchemaTemplate(
name, flatMeasurements, flatDataTypes, flatEncodings, compressors, false);
} catch (IOException e) {
throw new StatementExecutionException(e.getMessage());
}
}
/**
* @param templateName Template to add aligned measurements.
* @param measurementsPath If measurements get different prefix, or the prefix already exists in
* template but not aligned, throw exception.
* @param dataTypes Data type of these measurements.
* @param encodings Encoding of these measurements.
* @param compressors CompressionType of these measurements.
*/
@Override
public void addAlignedMeasurementsInTemplate(
String templateName,
List<String> measurementsPath,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
req.setCompressors(
compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
req.setIsAligned(true);
defaultSessionConnection.appendSchemaTemplate(req);
}
/**
* @param templateName Template to add a single aligned measurement.
* @param measurementPath If prefix of the path exists in template and not aligned, throw
* exception.
*/
@Override
public void addAlignedMeasurementInTemplate(
String templateName,
String measurementPath,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(Collections.singletonList(measurementPath));
req.setDataTypes(Collections.singletonList(dataType.ordinal()));
req.setEncodings(Collections.singletonList(encoding.ordinal()));
req.setCompressors(Collections.singletonList((int) compressor.serialize()));
req.setIsAligned(true);
defaultSessionConnection.appendSchemaTemplate(req);
}
/**
* @param templateName Template to add unaligned measurements.
* @param measurementsPath If prefix of any path exist in template but aligned, throw exception.
*/
@Override
public void addUnalignedMeasurementsInTemplate(
String templateName,
List<String> measurementsPath,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
req.setCompressors(
compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
req.setIsAligned(false);
defaultSessionConnection.appendSchemaTemplate(req);
}
/**
* @param templateName Template to add a single unaligned measurement.
* @param measurementPath If prefix of path exists in template but aligned, throw exception.
*/
@Override
public void addUnalignedMeasurementInTemplate(
String templateName,
String measurementPath,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(Collections.singletonList(measurementPath));
req.setDataTypes(Collections.singletonList(dataType.ordinal()));
req.setEncodings(Collections.singletonList(encoding.ordinal()));
req.setCompressors(Collections.singletonList((int) compressor.serialize()));
req.setIsAligned(false);
defaultSessionConnection.appendSchemaTemplate(req);
}
/**
* @param templateName Template to prune.
* @param path Remove node from template specified by the path, including its children nodes.
*/
@Override
public void deleteNodeInTemplate(String templateName, String path)
throws IOException, IoTDBConnectionException, StatementExecutionException {
TSPruneSchemaTemplateReq req = new TSPruneSchemaTemplateReq();
req.setName(templateName);
req.setPath(path);
defaultSessionConnection.pruneSchemaTemplate(req);
}
/** @return Amount of measurements in the template */
@Override
public int countMeasurementsInTemplate(String name)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(name);
req.setQueryType(TemplateQueryType.COUNT_MEASUREMENTS.ordinal());
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getCount();
}
/** @return If the node specified by the path is a measurement. */
@Override
public boolean isMeasurementInTemplate(String templateName, String path)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.IS_MEASUREMENT.ordinal());
req.setMeasurement(path);
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.result;
}
/** @return if there is a node correspond to the path in the template. */
@Override
public boolean isPathExistInTemplate(String templateName, String path)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.PATH_EXIST.ordinal());
req.setMeasurement(path);
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.result;
}
/** @return All paths of measurements in the template. */
@Override
public List<String> showMeasurementsInTemplate(String templateName)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
req.setMeasurement("");
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getMeasurements();
}
/** @return All paths of measurements under the pattern in the template. */
@Override
public List<String> showMeasurementsInTemplate(String templateName, String pattern)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
req.setMeasurement(pattern);
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getMeasurements();
}
/** @return All template names. */
@Override
public List<String> showAllTemplates()
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName("");
req.setQueryType(TemplateQueryType.SHOW_TEMPLATES.ordinal());
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getMeasurements();
}
/** @return All paths have been set to designated template. */
@Override
public List<String> showPathsTemplateSetOn(String templateName)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_SET_TEMPLATES.ordinal());
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getMeasurements();
}
/** @return All paths are using designated template. */
@Override
public List<String> showPathsTemplateUsingOn(String templateName)
throws StatementExecutionException, IoTDBConnectionException {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_USING_TEMPLATES.ordinal());
TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
return resp.getMeasurements();
}
@Override
public void unsetSchemaTemplate(String prefixPath, String templateName)
throws IoTDBConnectionException, StatementExecutionException {
TSUnsetSchemaTemplateReq request = getTSUnsetSchemaTemplateReq(prefixPath, templateName);
defaultSessionConnection.unsetSchemaTemplate(request);
}
@Override
public void dropSchemaTemplate(String templateName)
throws IoTDBConnectionException, StatementExecutionException {
TSDropSchemaTemplateReq request = getTSDropSchemaTemplateReq(templateName);
defaultSessionConnection.dropSchemaTemplate(request);
}
private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String templateName, String prefixPath) {
TSSetSchemaTemplateReq request = new TSSetSchemaTemplateReq();
request.setTemplateName(templateName);
request.setPrefixPath(prefixPath);
return request;
}
private TSUnsetSchemaTemplateReq getTSUnsetSchemaTemplateReq(
String prefixPath, String templateName) {
TSUnsetSchemaTemplateReq request = new TSUnsetSchemaTemplateReq();
request.setPrefixPath(prefixPath);
request.setTemplateName(templateName);
return request;
}
private TSDropSchemaTemplateReq getTSDropSchemaTemplateReq(String templateName) {
TSDropSchemaTemplateReq request = new TSDropSchemaTemplateReq();
request.setTemplateName(templateName);
return request;
}
/**
* Create timeseries represented by device template under given device paths.
*
* @param devicePathList the target device paths used for timeseries creation
*/
@Override
public void createTimeseriesUsingSchemaTemplate(List<String> devicePathList)
throws IoTDBConnectionException, StatementExecutionException {
if (devicePathList == null || devicePathList.contains(null)) {
throw new StatementExecutionException(
"Given device path list should not be or contains null.");
}
TCreateTimeseriesUsingSchemaTemplateReq request = new TCreateTimeseriesUsingSchemaTemplateReq();
request.setDevicePathList(devicePathList);
defaultSessionConnection.createTimeseriesUsingSchemaTemplate(request);
}
/**
* @param recordsGroup connection to record map
* @param insertConsumer insert function
* @param <T>
* <ul>
* <li>{@link TSInsertRecordsReq}
* <li>{@link TSInsertStringRecordsReq}
* <li>{@link TSInsertTabletsReq}
* </ul>
*
* @throws IoTDBConnectionException
* @throws StatementExecutionException
*/
@SuppressWarnings({
"squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
private <T> void insertByGroup(
Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
throws IoTDBConnectionException, StatementExecutionException {
List<CompletableFuture<Void>> completableFutures =
recordsGroup.entrySet().stream()
.map(
entry -> {
SessionConnection connection = entry.getKey();
T recordsReq = entry.getValue();
return CompletableFuture.runAsync(
() -> {
try {
insertConsumer.insert(connection, recordsReq);
} catch (RedirectException e) {
e.getDeviceEndPointMap().forEach(this::handleRedirection);
} catch (StatementExecutionException e) {
throw new CompletionException(e);
} catch (IoTDBConnectionException e) {
// remove the broken session
removeBrokenSessionConnection(connection);
try {
insertConsumer.insert(defaultSessionConnection, recordsReq);
} catch (IoTDBConnectionException | StatementExecutionException ex) {
throw new CompletionException(ex);
} catch (RedirectException ignored) {
}
}
},
OPERATION_EXECUTOR);
})
.collect(Collectors.toList());
StringBuilder errMsgBuilder = new StringBuilder();
for (CompletableFuture<Void> completableFuture : completableFutures) {
try {
completableFuture.join();
} catch (CompletionException completionException) {
Throwable cause = completionException.getCause();
logger.error("Meet error when async insert!", cause);
if (cause instanceof IoTDBConnectionException) {
throw (IoTDBConnectionException) cause;
} else {
errMsgBuilder.append(cause.getMessage());
}
}
}
if (errMsgBuilder.length() > 0) {
throw new StatementExecutionException(errMsgBuilder.toString());
}
}
@Override
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
@Override
public void setEnableQueryRedirection(boolean enableQueryRedirection) {
this.enableQueryRedirection = enableQueryRedirection;
}
@Override
public boolean isEnableRedirection() {
return enableRedirection;
}
@Override
public void setEnableRedirection(boolean enableRedirection) {
this.enableRedirection = enableRedirection;
}
@Override
public TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
return defaultSessionConnection.getBackupConfiguration();
}
@Override
public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
return defaultSessionConnection.fetchAllConnections();
}
public static class Builder {
private String host = SessionConfig.DEFAULT_HOST;
private int rpcPort = SessionConfig.DEFAULT_PORT;
private String username = SessionConfig.DEFAULT_USER;
private String pw = SessionConfig.DEFAULT_PASSWORD;
private int fetchSize = SessionConfig.DEFAULT_FETCH_SIZE;
private ZoneId zoneId = null;
private int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
private boolean enableRecordsAutoConvertTablet =
SessionConfig.DEFAULT_RECORDS_AUTO_CONVERT_TABLET;
private Version version = SessionConfig.DEFAULT_VERSION;
private long timeOut = SessionConfig.DEFAULT_QUERY_TIME_OUT;
private boolean enableAutoFetch = SessionConfig.DEFAULT_ENABLE_AUTO_FETCH;
private boolean useSSL = false;
private String trustStore;
private String trustStorePwd;
private int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
public Builder useSSL(boolean useSSL) {
this.useSSL = useSSL;
return this;
}
public Builder trustStore(String keyStore) {
this.trustStore = keyStore;
return this;
}
public Builder trustStorePwd(String keyStorePwd) {
this.trustStorePwd = keyStorePwd;
return this;
}
private List<String> nodeUrls = null;
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.rpcPort = port;
return this;
}
public Builder username(String username) {
this.username = username;
return this;
}
public Builder password(String password) {
this.pw = password;
return this;
}
public Builder fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}
public Builder zoneId(ZoneId zoneId) {
this.zoneId = zoneId;
return this;
}
public Builder thriftDefaultBufferSize(int thriftDefaultBufferSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
return this;
}
public Builder thriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
return this;
}
public Builder enableRedirection(boolean enableRedirection) {
this.enableRedirection = enableRedirection;
return this;
}
public Builder enableRecordsAutoConvertTablet(boolean enableRecordsAutoConvertTablet) {
this.enableRecordsAutoConvertTablet = enableRecordsAutoConvertTablet;
return this;
}
public Builder nodeUrls(List<String> nodeUrls) {
this.nodeUrls = nodeUrls;
return this;
}
public Builder version(Version version) {
this.version = version;
return this;
}
public Builder timeOut(long timeOut) {
this.timeOut = timeOut;
return this;
}
public Builder enableAutoFetch(boolean enableAutoFetch) {
this.enableAutoFetch = enableAutoFetch;
return this;
}
public Builder maxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
return this;
}
public Builder retryIntervalInMs(long retryIntervalInMs) {
this.retryIntervalInMs = retryIntervalInMs;
return this;
}
public Session build() {
if (nodeUrls != null
&& (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort != SessionConfig.DEFAULT_PORT)) {
throw new IllegalArgumentException(
"You should specify either nodeUrls or (host + rpcPort), but not both");
}
Session newSession = new Session(this);
return newSession;
}
}
}