blob: bae7195a9b767151f0ffbc35dc17ac80a61e8de9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tool;
import org.apache.iotdb.cli.type.ExitType;
import org.apache.iotdb.cli.utils.CliContext;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.cli.utils.JlineUtils;
import org.apache.iotdb.exception.ArgsErrorException;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.thrift.TException;
import org.jline.reader.LineReader;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Export CSV file.
*
* @version 1.0.0 20170719
*/
public class ExportData extends AbstractDataTool {
private static final String TARGET_DIR_ARGS = "td";
private static final String TARGET_DIR_NAME = "targetDirectory";
private static final String TARGET_FILE_ARGS = "f";
private static final String TARGET_FILE_NAME = "targetFile";
private static final String SQL_FILE_ARGS = "s";
private static final String SQL_FILE_NAME = "sqlfile";
private static final String DATA_TYPE_ARGS = "datatype";
private static final String DATA_TYPE_NAME = "datatype";
private static final String QUERY_COMMAND_ARGS = "q";
private static final String QUERY_COMMAND_NAME = "queryCommand";
private static final String EXPORT_TYPE_ARGS = "type";
private static final String EXPORT_TYPE_NAME = "exportType";
private static final String EXPORT_SQL_TYPE_NAME = "sql";
private static final String ALIGNED_ARGS = "aligned";
private static final String ALIGNED_NAME = "create the aligned insert sql";
private static final String LINES_PER_FILE_ARGS = "linesPerFile";
private static final String LINES_PER_FILE_ARGS_NAME = "Lines Per File";
private static final String TSFILEDB_CLI_PREFIX = "ExportCsv";
private static final String DUMP_FILE_NAME_DEFAULT = "dump";
private static String targetFile = DUMP_FILE_NAME_DEFAULT;
private static String targetDirectory;
private static Boolean needDataTypePrinted;
private static String queryCommand;
private static String timestampPrecision;
private static int linesPerFile = 10000;
private static long timeout = -1;
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
@SuppressWarnings({
"squid:S3776",
"squid:S2093"
}) // Suppress high Cognitive Complexity warning, ignore try-with-resources
/* main function of export csv tool. */
public static void main(String[] args) {
Options options = createOptions();
HelpFormatter hf = new HelpFormatter();
CommandLine commandLine = null;
CommandLineParser parser = new DefaultParser();
hf.setOptionComparator(null); // avoid reordering
hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
if (args == null || args.length == 0) {
ioTPrinter.println("Too few params input, please check the following hint.");
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
try {
commandLine = parser.parse(options, args);
} catch (ParseException e) {
ioTPrinter.println(e.getMessage());
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
if (commandLine.hasOption(HELP_ARGS)) {
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
int exitCode = CODE_OK;
try {
parseBasicParams(commandLine);
parseSpecialParams(commandLine);
if (!checkTimeFormat()) {
System.exit(CODE_ERROR);
}
session = new Session(host, Integer.parseInt(port), username, password);
session.open(false);
timestampPrecision = session.getTimestampPrecision();
setTimeZone();
if (queryCommand == null) {
String sqlFile = commandLine.getOptionValue(SQL_FILE_ARGS);
String sql;
if (sqlFile == null) {
LineReader lineReader =
JlineUtils.getLineReader(
new CliContext(System.in, System.out, System.err, ExitType.EXCEPTION),
username,
host,
port);
sql = lineReader.readLine(TSFILEDB_CLI_PREFIX + "> please input query: ");
ioTPrinter.println(sql);
String[] values = sql.trim().split(";");
for (int i = 0; i < values.length; i++) {
dumpResult(values[i], i);
}
} else {
dumpFromSqlFile(sqlFile);
}
} else {
dumpResult(queryCommand, 0);
}
} catch (IOException e) {
ioTPrinter.println("Failed to operate on file, because " + e.getMessage());
exitCode = CODE_ERROR;
} catch (ArgsErrorException e) {
ioTPrinter.println("Invalid args: " + e.getMessage());
exitCode = CODE_ERROR;
} catch (IoTDBConnectionException | StatementExecutionException e) {
ioTPrinter.println("Connect failed because " + e.getMessage());
exitCode = CODE_ERROR;
} catch (TException e) {
ioTPrinter.println(
"Can not get the timestamp precision from server because " + e.getMessage());
exitCode = CODE_ERROR;
} finally {
if (session != null) {
try {
session.close();
} catch (IoTDBConnectionException e) {
exitCode = CODE_ERROR;
ioTPrinter.println(
"Encounter an error when closing session, error is: " + e.getMessage());
}
}
}
System.exit(exitCode);
}
private static void parseSpecialParams(CommandLine commandLine) throws ArgsErrorException {
targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, commandLine);
targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
needDataTypePrinted = Boolean.valueOf(commandLine.getOptionValue(DATA_TYPE_ARGS));
queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
exportType = commandLine.getOptionValue(EXPORT_TYPE_ARGS);
String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
if (timeoutString != null) {
timeout = Long.parseLong(timeoutString);
}
if (needDataTypePrinted == null) {
needDataTypePrinted = true;
}
if (targetFile == null) {
targetFile = DUMP_FILE_NAME_DEFAULT;
}
timeFormat = commandLine.getOptionValue(TIME_FORMAT_ARGS);
if (timeFormat == null) {
timeFormat = "default";
}
timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS);
if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
targetDirectory += File.separator;
}
if (commandLine.getOptionValue(LINES_PER_FILE_ARGS) != null) {
linesPerFile = Integer.parseInt(commandLine.getOptionValue(LINES_PER_FILE_ARGS));
}
}
/**
* commandline option create.
*
* @return object Options
*/
private static Options createOptions() {
Options options = createNewOptions();
Option opTargetFile =
Option.builder(TARGET_DIR_ARGS)
.required()
.argName(TARGET_DIR_NAME)
.hasArg()
.desc("Target File Directory (required)")
.build();
options.addOption(opTargetFile);
Option targetFileName =
Option.builder(TARGET_FILE_ARGS)
.argName(TARGET_FILE_NAME)
.hasArg()
.desc("Export file name (optional)")
.build();
options.addOption(targetFileName);
Option opSqlFile =
Option.builder(SQL_FILE_ARGS)
.argName(SQL_FILE_NAME)
.hasArg()
.desc("SQL File Path (optional)")
.build();
options.addOption(opSqlFile);
Option opTimeFormat =
Option.builder(TIME_FORMAT_ARGS)
.argName(TIME_FORMAT_NAME)
.hasArg()
.desc(
"Output time Format in csv file. "
+ "You can choose 1) timestamp, number, long 2) ISO8601, default 3) "
+ "user-defined pattern like yyyy-MM-dd\\ HH:mm:ss, default ISO8601 (optional)")
.build();
options.addOption(opTimeFormat);
Option opTimeZone =
Option.builder(TIME_ZONE_ARGS)
.argName(TIME_ZONE_NAME)
.hasArg()
.desc("Time Zone eg. +08:00 or -01:00 (optional)")
.build();
options.addOption(opTimeZone);
Option opDataType =
Option.builder(DATA_TYPE_ARGS)
.argName(DATA_TYPE_NAME)
.hasArg()
.desc(
"Will the data type of timeseries be printed in the head line of the CSV file?"
+ '\n'
+ "You can choose true) or false) . (optional)")
.build();
options.addOption(opDataType);
Option opQuery =
Option.builder(QUERY_COMMAND_ARGS)
.argName(QUERY_COMMAND_NAME)
.hasArg()
.desc("The query command that you want to execute. (optional)")
.build();
options.addOption(opQuery);
Option opTypeQuery =
Option.builder(EXPORT_TYPE_ARGS)
.argName(EXPORT_TYPE_NAME)
.hasArg()
.desc("Export file type ?" + '\n' + "You can choose csv) or sql) . (optional)")
.build();
options.addOption(opTypeQuery);
Option opAligned =
Option.builder(ALIGNED_ARGS)
.argName(ALIGNED_NAME)
.hasArg()
.desc("Whether to use the interface of aligned (optional)")
.build();
options.addOption(opAligned);
Option opLinesPerFile =
Option.builder(LINES_PER_FILE_ARGS)
.argName(LINES_PER_FILE_ARGS_NAME)
.hasArg()
.desc("Lines per dump file.")
.build();
options.addOption(opLinesPerFile);
Option opHelp =
Option.builder(HELP_ARGS)
.longOpt(HELP_ARGS)
.hasArg(false)
.desc("Display help information")
.build();
options.addOption(opHelp);
Option opTimeout =
Option.builder(TIMEOUT_ARGS)
.longOpt(TIMEOUT_NAME)
.hasArg()
.desc("Timeout for session query")
.build();
options.addOption(opTimeout);
return options;
}
/**
* This method will be called, if the query commands are written in a sql file.
*
* @param filePath sql file path
* @throws IOException exception
*/
private static void dumpFromSqlFile(String filePath) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String sql;
int index = 0;
while ((sql = reader.readLine()) != null) {
dumpResult(sql, index);
index++;
}
}
}
/**
* Dump files from database to CSV file.
*
* @param sql export the result of executing the sql
* @param index used to create dump file name
*/
private static void dumpResult(String sql, int index) {
if (EXPORT_SQL_TYPE_NAME.equalsIgnoreCase(exportType)) {
legalCheck(sql);
}
final String path = targetDirectory + targetFile + index;
try {
SessionDataSet sessionDataSet = session.executeQueryStatement(sql, timeout);
List<Object> headers = new ArrayList<>();
List<String> names = sessionDataSet.getColumnNames();
List<String> types = sessionDataSet.getColumnTypes();
if (EXPORT_SQL_TYPE_NAME.equalsIgnoreCase(exportType)) {
writeSqlFile(sessionDataSet, path, names, linesPerFile);
} else {
if (Boolean.TRUE.equals(needDataTypePrinted)) {
for (int i = 0; i < names.size(); i++) {
if (!"Time".equals(names.get(i)) && !"Device".equals(names.get(i))) {
headers.add(String.format("%s(%s)", names.get(i), types.get(i)));
} else {
headers.add(names.get(i));
}
}
} else {
headers.addAll(names);
}
writeCsvFile(sessionDataSet, path, headers, linesPerFile);
}
sessionDataSet.closeOperationHandle();
ioTPrinter.println("Export completely!");
} catch (StatementExecutionException | IoTDBConnectionException | IOException e) {
ioTPrinter.println("Cannot dump result because: " + e.getMessage());
}
}
private static void legalCheck(String sql) {
String aggregatePattern =
"\\b(count|sum|avg|extreme|max_value|min_value|first_value|last_value|max_time|min_time|stddev|stddev_pop|stddev_samp|variance|var_pop|var_samp|max_by|min_by)\\b\\s*\\(";
Pattern pattern = Pattern.compile(aggregatePattern, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql.toUpperCase(Locale.ROOT));
if (matcher.find()) {
ioTPrinter.println("The sql you entered is invalid, please don't use aggregate query.");
}
}
public static String timeTrans(Long time) {
switch (timeFormat) {
case "default":
return RpcUtils.parseLongToDateWithPrecision(
DateTimeFormatter.ISO_OFFSET_DATE_TIME, time, zoneId, timestampPrecision);
case "timestamp":
case "long":
case "number":
return String.valueOf(time);
default:
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), zoneId)
.format(DateTimeFormatter.ofPattern(timeFormat));
}
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static void writeCsvFile(
SessionDataSet sessionDataSet, String filePath, List<Object> headers, int linesPerFile)
throws IOException, IoTDBConnectionException, StatementExecutionException {
int fileIndex = 0;
boolean hasNext = true;
while (hasNext) {
int i = 0;
final String finalFilePath = filePath + "_" + fileIndex + ".csv";
final CSVPrinterWrapper csvPrinterWrapper = new CSVPrinterWrapper(finalFilePath);
csvPrinterWrapper.printRecord(headers);
while (i++ < linesPerFile) {
if (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
if (rowRecord.getTimestamp() != 0) {
csvPrinterWrapper.print(timeTrans(rowRecord.getTimestamp()));
}
rowRecord
.getFields()
.forEach(
field -> {
String fieldStringValue = field.getStringValue();
if (!"null".equals(field.getStringValue())) {
if (field.getDataType() == TSDataType.TEXT
&& !fieldStringValue.startsWith("root.")) {
fieldStringValue = "\"" + fieldStringValue + "\"";
}
csvPrinterWrapper.print(fieldStringValue);
} else {
csvPrinterWrapper.print("");
}
});
csvPrinterWrapper.println();
} else {
hasNext = false;
break;
}
}
fileIndex++;
csvPrinterWrapper.flush();
csvPrinterWrapper.close();
}
}
public static void writeSqlFile(
SessionDataSet sessionDataSet, String filePath, List<String> headers, int linesPerFile)
throws IOException, IoTDBConnectionException, StatementExecutionException {
int fileIndex = 0;
String deviceName = null;
boolean writeNull = false;
List<String> seriesList = new ArrayList<>(headers);
if (CollectionUtils.isEmpty(headers) || headers.size() <= 1) {
writeNull = true;
} else {
if (headers.contains("Device")) {
seriesList.remove("Time");
seriesList.remove("Device");
} else {
Path path = new Path(seriesList.get(1), true);
deviceName = path.getDevice();
seriesList.remove("Time");
for (int i = 0; i < seriesList.size(); i++) {
String series = seriesList.get(i);
path = new Path(series, true);
seriesList.set(i, path.getMeasurement());
}
}
}
boolean hasNext = true;
while (hasNext) {
int i = 0;
final String finalFilePath = filePath + "_" + fileIndex + ".sql";
FileWriter writer = new FileWriter(finalFilePath);
if (writeNull) {
break;
}
while (i++ < linesPerFile) {
if (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();
List<String> headersTemp = new ArrayList<>(seriesList);
List<String> timeseries = new ArrayList<>();
if (headers.contains("Device")) {
deviceName = fields.get(0).toString();
if (deviceName.startsWith("root.__system")) {
continue;
}
for (String header : headersTemp) {
timeseries.add(deviceName + "." + header);
}
} else {
if (headers.get(1).startsWith("root.__system")) {
continue;
}
timeseries.addAll(headers);
timeseries.remove(0);
}
String sqlMiddle = null;
if (Boolean.TRUE.equals(ALIGNED_ARGS)) {
sqlMiddle = " ALIGNED VALUES (" + timeTrans(rowRecord.getTimestamp()) + ",";
} else {
sqlMiddle = " VALUES (" + timeTrans(rowRecord.getTimestamp()) + ",";
}
List<String> values = new ArrayList<>();
if (headers.contains("Device")) {
fields.remove(0);
}
for (int index = 0; index < fields.size(); index++) {
RowRecord next =
session
.executeQueryStatement("SHOW TIMESERIES " + timeseries.get(index), timeout)
.next();
if (ObjectUtils.isNotEmpty(next)) {
List<Field> timeseriesList = next.getFields();
String value = fields.get(index).toString();
if (value.equals("null")) {
headersTemp.remove(seriesList.get(index));
continue;
}
if ("TEXT".equalsIgnoreCase(timeseriesList.get(3).getStringValue())) {
values.add("\"" + value + "\"");
} else {
values.add(value);
}
} else {
headersTemp.remove(seriesList.get(index));
continue;
}
}
if (CollectionUtils.isNotEmpty(headersTemp)) {
writer.write(
"INSERT INTO "
+ deviceName
+ " (TIMESTAMP,"
+ String.join(",", headersTemp)
+ ")"
+ sqlMiddle
+ String.join(",", values)
+ ");\n");
}
} else {
hasNext = false;
break;
}
}
fileIndex++;
writer.flush();
writer.close();
}
}
}