blob: cf303814934cfa29f3ec9801d080e379538eb472 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSIService.Client;
import org.apache.iotdb.service.rpc.thrift.TSIService.Client.Factory;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SuppressWarnings("java:S106")
public class ClientMain {
private static final Logger logger = LoggerFactory.getLogger(ClientMain.class);
private static final String PARAM_INSERTION = "i";
private static final String PARAM_QUERY = "q";
private static final String PARAM_DELETE_STORAGE_GROUP = "dsg";
private static final String PARAM_DELETE_SERIES = "ds";
private static final String PARAM_QUERY_PORTS = "qp";
private static final String PARAM_INSERT_PORT = "ip";
private static final String PARAM_BATCH = "b";
private static Options options = new Options();
private static String ip = "127.0.0.1";
private static int port = 6667;
static {
options.addOption(new Option(PARAM_INSERTION, "Perform insertion"));
options.addOption(new Option(PARAM_QUERY, "Perform query"));
options.addOption(new Option(PARAM_DELETE_SERIES, "Perform deleting timeseries"));
options.addOption(new Option(PARAM_DELETE_STORAGE_GROUP, "Perform deleting storage group"));
options.addOption(
new Option(PARAM_QUERY_PORTS, true, "Ports to query (ip is currently " + "localhost)"));
options.addOption(new Option(PARAM_INSERT_PORT, true, "Port to perform insertion"));
options.addOption(new Option(PARAM_BATCH, "Test batch statement"));
}
private static Map<String, TSStatus> failedQueries;
private static final String[] STORAGE_GROUPS =
new String[] {
"root.beijing", "root.shanghai", "root.guangzhou", "root.shenzhen",
};
private static final String[] DEVICES =
new String[] {
"root.beijing.d1", "root.shanghai.d1", "root.guangzhou.d1", "root.shenzhen.d1",
};
private static final String[] MEASUREMENTS = new String[] {"s1"};
private static final TSDataType[] DATA_TYPES = new TSDataType[] {TSDataType.DOUBLE};
private static List<IMeasurementSchema> schemas;
private static final String[] DATA_QUERIES =
new String[] {
// raw data multi series
"SELECT * FROM root",
"SELECT * FROM root WHERE time <= 691200000",
"SELECT * FROM root WHERE time >= 391200000 and time <= 691200000",
"SELECT * FROM root.*.* WHERE s1 <= 0.7",
// raw data single series
"SELECT s1 FROM root.beijing.d1",
"SELECT s1 FROM root.shanghai.d1",
"SELECT s1 FROM root.guangzhou.d1",
"SELECT s1 FROM root.shenzhen.d1",
// aggregation
"SELECT count(s1) FROM root.*.*",
"SELECT avg(s1) FROM root.*.*",
"SELECT sum(s1) FROM root.*.*",
"SELECT max_value(s1) FROM root.*.*",
"SELECT count(s1) FROM root.*.* where time <= 691200000",
"SELECT count(s1) FROM root.*.* where s1 <= 0.7",
// group by device
"SELECT * FROM root GROUP BY DEVICE",
// fill
"SELECT s1 FROM root.beijing.d1 WHERE time = 86400000 FILL (DOUBLE[PREVIOUS,1d])",
"SELECT s1 FROM root.shanghai.d1 WHERE time = 86400000 FILL (DOUBLE[LINEAR,1d,1d])",
"SELECT s1 FROM root.guangzhou.d1 WHERE time = 126400000 FILL (DOUBLE[PREVIOUS,1d])",
"SELECT s1 FROM root.shenzhen.d1 WHERE time = 126400000 FILL (DOUBLE[LINEAR,1d,1d])",
// group by
"SELECT COUNT(*) FROM root.*.* GROUP BY ([0, 864000000), 3d, 3d)",
"SELECT AVG(*) FROM root.*.* WHERE s1 <= 0.7 GROUP BY ([0, 864000000), 3d, 3d)",
// last
"SELECT LAST s1 FROM root.*.*",
};
private static final String[] META_QUERY =
new String[] {
"SHOW STORAGE GROUP",
"SHOW TIMESERIES root",
"COUNT TIMESERIES root",
"COUNT TIMESERIES root GROUP BY LEVEL=2",
"SHOW DEVICES",
"SHOW TIMESERIES root limit 1 offset 1",
};
public static void main(String[] args)
throws TException, StatementExecutionException, IoTDBConnectionException, ParseException,
SQLException, ClassNotFoundException {
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(options, args);
boolean noOption = args.length == 0;
failedQueries = new HashMap<>();
prepareSchema();
if (commandLine.hasOption(PARAM_INSERT_PORT)) {
port = Integer.parseInt(commandLine.getOptionValue(PARAM_INSERT_PORT));
}
doInsertion(noOption, commandLine);
doQuery(noOption, commandLine);
doDeleteSeries(noOption, commandLine);
doDeleteSG(noOption, commandLine);
doBatchStmt(noOption, commandLine);
}
private static void doInsertion(boolean noOption, CommandLine commandLine) throws TException {
if (noOption || commandLine.hasOption(PARAM_INSERTION)) {
System.out.println("Test insertion");
Client client = getClient(ip, port);
long sessionId = connectClient(client);
testInsertion(client, sessionId);
client.closeSession(new TSCloseSessionReq(sessionId));
}
}
private static void doQuery(boolean noOption, CommandLine commandLine)
throws StatementExecutionException, TException, IoTDBConnectionException {
if (noOption || commandLine.hasOption(PARAM_QUERY)) {
int[] queryPorts = null;
if (commandLine.hasOption(PARAM_QUERY_PORTS)) {
queryPorts = parseIntArray(commandLine.getOptionValue(PARAM_QUERY_PORTS));
}
if (queryPorts == null) {
queryPorts = new int[] {port, port + 1, port + 2};
}
for (int queryPort : queryPorts) {
System.out.println("Test port: " + queryPort);
Client client = getClient(ip, queryPort);
long sessionId = connectClient(client);
System.out.println("Test data queries");
testQuery(client, sessionId, DATA_QUERIES);
System.out.println("Test metadata queries");
testQuery(client, sessionId, META_QUERY);
logger.info("Failed queries: {}", failedQueries);
client.closeSession(new TSCloseSessionReq(sessionId));
}
}
}
private static void doDeleteSeries(boolean noOption, CommandLine commandLine) throws TException {
if (noOption || commandLine.hasOption(PARAM_DELETE_SERIES)) {
System.out.println("Test delete timeseries");
Client client = getClient(ip, port);
long sessionId = connectClient(client);
testDeleteTimeseries(client, sessionId);
client.closeSession(new TSCloseSessionReq(sessionId));
}
}
private static void doDeleteSG(boolean noOption, CommandLine commandLine)
throws StatementExecutionException, TException, IoTDBConnectionException {
if (noOption || commandLine.hasOption(PARAM_DELETE_STORAGE_GROUP)) {
System.out.println("Test delete storage group");
Client client = getClient(ip, port);
long sessionId = connectClient(client);
testDeleteStorageGroup(client, sessionId);
client.closeSession(new TSCloseSessionReq(sessionId));
}
}
private static void doBatchStmt(boolean noOption, CommandLine commandLine)
throws SQLException, ClassNotFoundException {
if (noOption || commandLine.hasOption(PARAM_BATCH)) {
System.out.println("Test batch create sgs");
testBatch(ip, port);
}
}
private static int[] parseIntArray(String str) {
if (str == null) {
return new int[0];
}
String[] split = str.split(",");
int[] ret = new int[split.length];
for (int i = 0; i < split.length; i++) {
ret[i] = Integer.parseInt(split[i]);
}
return ret;
}
private static long connectClient(Client client) throws TException {
TSOpenSessionReq openReq =
new TSOpenSessionReq(
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, ZoneId.systemDefault().getId());
openReq.setUsername("root");
openReq.setPassword("root");
TSOpenSessionResp openResp = client.openSession(openReq);
return openResp.getSessionId();
}
@SuppressWarnings({"java:S2095"}) // the transport is used later
private static Client getClient(String ip, int port) throws TTransportException {
TSIService.Client.Factory factory = new Factory();
TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(ip, port));
transport.open();
TProtocol protocol =
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
? new TCompactProtocol(transport)
: new TBinaryProtocol(transport);
return factory.getClient(protocol);
}
private static void prepareSchema() {
schemas = new ArrayList<>();
for (String device : DEVICES) {
for (int i = 0; i < MEASUREMENTS.length; i++) {
String measurement = MEASUREMENTS[i];
schemas.add(
new MeasurementSchema(
device + IoTDBConstant.PATH_SEPARATOR + measurement, DATA_TYPES[i]));
}
}
}
private static void testQuery(Client client, long sessionId, String[] queries)
throws TException, StatementExecutionException, IoTDBConnectionException {
long statementId = client.requestStatementId(sessionId);
for (String dataQuery : queries) {
executeQuery(client, sessionId, dataQuery, statementId);
}
TSCloseOperationReq tsCloseOperationReq = new TSCloseOperationReq(sessionId);
tsCloseOperationReq.setStatementId(statementId);
client.closeOperation(tsCloseOperationReq);
}
private static void executeQuery(Client client, long sessionId, String query, long statementId)
throws TException, StatementExecutionException, IoTDBConnectionException {
if (logger.isInfoEnabled()) {
logger.info("{ {} }", query);
}
TSExecuteStatementResp resp =
client.executeQueryStatement(
new TSExecuteStatementReq(sessionId, query, statementId).setFetchSize(1000));
if (resp.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedQueries.put(query, resp.status);
return;
}
long queryId = resp.getQueryId();
if (logger.isInfoEnabled()) {
logger.info(resp.columns.toString());
}
SessionDataSet dataSet =
new SessionDataSet(
query,
resp.getColumns(),
resp.getDataTypeList(),
resp.columnNameIndexMap,
queryId,
statementId,
client,
sessionId,
resp.queryDataSet,
false);
while (dataSet.hasNext()) {
if (logger.isInfoEnabled()) {
logger.info(dataSet.next().toString());
}
}
System.out.println();
TSCloseOperationReq tsCloseOperationReq = new TSCloseOperationReq(sessionId);
tsCloseOperationReq.setQueryId(queryId);
client.closeOperation(tsCloseOperationReq);
}
private static void testDeleteStorageGroup(Client client, long sessionId)
throws TException, StatementExecutionException, IoTDBConnectionException {
if (logger.isInfoEnabled()) {
logger.info(client.deleteStorageGroups(sessionId, Arrays.asList(STORAGE_GROUPS)).toString());
}
testQuery(client, sessionId, new String[] {"SELECT * FROM root"});
}
private static void registerTimeseries(long sessionId, Client client) throws TException {
TSCreateTimeseriesReq req = new TSCreateTimeseriesReq();
req.setSessionId(sessionId);
for (IMeasurementSchema schema : schemas) {
req.setDataType(schema.getType().ordinal());
req.setEncoding(schema.getEncodingType().ordinal());
req.setCompressor(schema.getCompressor().ordinal());
req.setPath(schema.getMeasurementId());
if (logger.isInfoEnabled()) {
logger.info(client.createTimeseries(req).toString());
}
}
}
@SuppressWarnings("ConstantConditions")
private static void testInsertion(Client client, long sessionId) throws TException {
for (String storageGroup : STORAGE_GROUPS) {
if (logger.isInfoEnabled()) {
logger.info(client.setStorageGroup(sessionId, storageGroup).toString());
}
}
registerTimeseries(sessionId, client);
TSInsertStringRecordReq insertReq = new TSInsertStringRecordReq();
insertReq.setMeasurements(Arrays.asList(MEASUREMENTS));
insertReq.setSessionId(sessionId);
for (int i = 0; i < 10; i++) {
List<String> values = new ArrayList<>(MEASUREMENTS.length);
insertReq.setTimestamp(i * 24 * 3600 * 1000L);
for (int i1 = 0; i1 < MEASUREMENTS.length; i1++) {
switch (DATA_TYPES[i1]) {
case DOUBLE:
values.add(Double.toString(i * 0.1));
break;
case BOOLEAN:
values.add(Boolean.toString(i % 2 == 0));
break;
case INT64:
values.add(Long.toString(i));
break;
case INT32:
values.add(Integer.toString(i));
break;
case FLOAT:
values.add(Float.toString(i * 0.1f));
break;
case TEXT:
values.add("S" + i);
break;
}
}
insertReq.setValues(values);
for (String device : DEVICES) {
insertReq.setDeviceId(device);
if (logger.isInfoEnabled()) {
logger.info(insertReq.toString());
logger.info(client.insertStringRecord(insertReq).toString());
}
}
}
}
private static void testDeleteTimeseries(Client client, long sessionId) throws TException {
List<String> paths = new ArrayList<>();
for (String measurement : MEASUREMENTS) {
for (String device : DEVICES) {
paths.add(device + "." + measurement);
}
}
if (logger.isInfoEnabled()) {
logger.info(client.deleteTimeseries(sessionId, paths).toString());
}
}
private static void testBatch(String ip, int port) throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + String.format("%s:%d/", ip, port), "root", "root");
Statement statement = connection.createStatement()) {
statement.addBatch("SET STORAGE GROUP TO root.batch1");
statement.addBatch("SET STORAGE GROUP TO root.batch2");
statement.addBatch("SET STORAGE GROUP TO root.batch3");
statement.addBatch("SET STORAGE GROUP TO root.batch4");
statement.executeBatch();
statement.clearBatch();
try (ResultSet set = statement.executeQuery("SHOW STORAGE GROUP")) {
int colNum = set.getMetaData().getColumnCount();
while (set.next()) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < colNum; i++) {
stringBuilder.append(set.getString(i + 1)).append(",");
}
System.out.println(stringBuilder);
}
}
}
}
}