| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.tool; |
| |
| import org.apache.iotdb.cli.utils.IoTPrinter; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.utils.PathUtils; |
| import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; |
| import org.apache.iotdb.db.utils.DateTimeUtils; |
| import org.apache.iotdb.db.utils.constant.SqlConstant; |
| import org.apache.iotdb.exception.ArgsErrorException; |
| import org.apache.iotdb.isession.SessionDataSet; |
| import org.apache.iotdb.jdbc.Config; |
| import org.apache.iotdb.rpc.IoTDBConnectionException; |
| import org.apache.iotdb.rpc.StatementExecutionException; |
| import org.apache.iotdb.session.Session; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.common.Field; |
| import org.apache.iotdb.tsfile.read.common.RowRecord; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.DefaultParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.csv.CSVFormat; |
| import org.apache.commons.csv.CSVParser; |
| import org.apache.commons.csv.CSVRecord; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.thrift.annotation.Nullable; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.iotdb.jdbc.Config.IOTDB_ERROR_PREFIX; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT; |
| |
| public class ImportData extends AbstractDataTool { |
| |
| private static final String FILE_ARGS = "f"; |
| private static final String FILE_NAME = "file or folder"; |
| |
| private static final String FAILED_FILE_ARGS = "fd"; |
| private static final String FAILED_FILE_NAME = "failed file directory"; |
| |
| private static final String BATCH_POINT_SIZE_ARGS = "batch"; |
| private static final String BATCH_POINT_SIZE_NAME = "batch point size"; |
| |
| private static final String ALIGNED_ARGS = "aligned"; |
| private static final String ALIGNED_NAME = "use the aligned interface"; |
| |
| private static final String CSV_SUFFIXS = "csv"; |
| private static final String TXT_SUFFIXS = "txt"; |
| |
| private static final String SQL_SUFFIXS = "sql"; |
| |
| private static final String TIMESTAMP_PRECISION_ARGS = "tp"; |
| private static final String TIMESTAMP_PRECISION_NAME = "timestamp precision (ms/us/ns)"; |
| |
| private static final String TYPE_INFER_ARGS = "typeInfer"; |
| private static final String TYPE_INFER_ARGS_NAME = "type infer"; |
| |
| private static final String LINES_PER_FAILED_FILE_ARGS = "linesPerFailedFile"; |
| private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "Lines Per FailedFile"; |
| |
| private static final String TSFILEDB_CLI_PREFIX = "ImportCsv"; |
| |
| private static String targetPath; |
| private static String failedFileDirectory = null; |
| private static int linesPerFailedFile = 10000; |
| private static Boolean aligned = false; |
| |
| private static String timeColumn = "Time"; |
| private static String deviceColumn = "Device"; |
| |
| private static int batchPointSize = 100_000; |
| |
| private static String timestampPrecision = "ms"; |
| |
| private static final String DATATYPE_BOOLEAN = "boolean"; |
| private static final String DATATYPE_INT = "int"; |
| private static final String DATATYPE_LONG = "long"; |
| private static final String DATATYPE_FLOAT = "float"; |
| private static final String DATATYPE_DOUBLE = "double"; |
| private static final String DATATYPE_NAN = "NaN"; |
| private static final String DATATYPE_TEXT = "text"; |
| |
| private static final String DATATYPE_NULL = "null"; |
| private static int fetchSize = 1000; |
| |
| private static final String INSERT_CSV_MEET_ERROR_MSG = "Meet error when insert csv because "; |
| |
| private static final Map<String, TSDataType> TYPE_INFER_KEY_DICT = new HashMap<>(); |
| |
| static { |
| TYPE_INFER_KEY_DICT.put(DATATYPE_BOOLEAN, TSDataType.BOOLEAN); |
| TYPE_INFER_KEY_DICT.put(DATATYPE_INT, TSDataType.FLOAT); |
| TYPE_INFER_KEY_DICT.put(DATATYPE_LONG, TSDataType.DOUBLE); |
| TYPE_INFER_KEY_DICT.put(DATATYPE_FLOAT, TSDataType.FLOAT); |
| TYPE_INFER_KEY_DICT.put(DATATYPE_DOUBLE, TSDataType.DOUBLE); |
| TYPE_INFER_KEY_DICT.put(DATATYPE_NAN, TSDataType.DOUBLE); |
| } |
| |
| private static final Map<String, TSDataType> TYPE_INFER_VALUE_DICT = new HashMap<>(); |
| |
| static { |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_BOOLEAN, TSDataType.BOOLEAN); |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_INT, TSDataType.INT32); |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_LONG, TSDataType.INT64); |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_FLOAT, TSDataType.FLOAT); |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_DOUBLE, TSDataType.DOUBLE); |
| TYPE_INFER_VALUE_DICT.put(DATATYPE_TEXT, TSDataType.TEXT); |
| } |
| |
| private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out); |
| |
| /** |
| * create the commandline options. |
| * |
| * @return object Options |
| */ |
| private static Options createOptions() { |
| Options options = createNewOptions(); |
| |
| Option opFile = |
| Option.builder(FILE_ARGS) |
| .required() |
| .argName(FILE_NAME) |
| .hasArg() |
| .desc( |
| "If input a file path, load a csv file, " |
| + "otherwise load all csv file under this directory (required)") |
| .build(); |
| options.addOption(opFile); |
| |
| Option opFailedFile = |
| Option.builder(FAILED_FILE_ARGS) |
| .argName(FAILED_FILE_NAME) |
| .hasArg() |
| .desc( |
| "Specifying a directory to save failed file, default YOUR_CSV_FILE_PATH (optional)") |
| .build(); |
| options.addOption(opFailedFile); |
| |
| Option opAligned = |
| Option.builder(ALIGNED_ARGS) |
| .argName(ALIGNED_NAME) |
| .hasArg() |
| .desc("Whether to use the interface of aligned (optional)") |
| .build(); |
| options.addOption(opAligned); |
| |
| Option opHelp = |
| Option.builder(HELP_ARGS) |
| .longOpt(HELP_ARGS) |
| .hasArg(false) |
| .desc("Display help information") |
| .build(); |
| options.addOption(opHelp); |
| |
| Option opTimeZone = |
| Option.builder(TIME_ZONE_ARGS) |
| .argName(TIME_ZONE_NAME) |
| .hasArg() |
| .desc("Time Zone eg. +08:00 or -01:00 (optional)") |
| .build(); |
| options.addOption(opTimeZone); |
| |
| Option opBatchPointSize = |
| Option.builder(BATCH_POINT_SIZE_ARGS) |
| .argName(BATCH_POINT_SIZE_NAME) |
| .hasArg() |
| .desc("100000 (optional)") |
| .build(); |
| options.addOption(opBatchPointSize); |
| |
| Option opTimestampPrecision = |
| Option.builder(TIMESTAMP_PRECISION_ARGS) |
| .argName(TIMESTAMP_PRECISION_NAME) |
| .hasArg() |
| .desc("Timestamp precision (ms/us/ns)") |
| .build(); |
| |
| options.addOption(opTimestampPrecision); |
| |
| Option opTypeInfer = |
| Option.builder(TYPE_INFER_ARGS) |
| .argName(TYPE_INFER_ARGS_NAME) |
| .numberOfArgs(5) |
| .hasArgs() |
| .valueSeparator(',') |
| .desc("Define type info by option:\"boolean=text,int=long, ...") |
| .build(); |
| options.addOption(opTypeInfer); |
| |
| Option opFailedLinesPerFile = |
| Option.builder(LINES_PER_FAILED_FILE_ARGS) |
| .argName(LINES_PER_FAILED_FILE_ARGS_NAME) |
| .hasArgs() |
| .desc("Lines per failed file") |
| .build(); |
| options.addOption(opFailedLinesPerFile); |
| |
| return options; |
| } |
| |
| /** |
| * parse optional params |
| * |
| * @param commandLine |
| */ |
| private static void parseSpecialParams(CommandLine commandLine) throws ArgsErrorException { |
| timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS); |
| targetPath = commandLine.getOptionValue(FILE_ARGS); |
| if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) { |
| batchPointSize = Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS)); |
| } |
| if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) { |
| failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS); |
| File file = new File(failedFileDirectory); |
| if (!file.isDirectory()) { |
| file.mkdir(); |
| failedFileDirectory = file.getAbsolutePath() + File.separator; |
| } |
| } |
| if (commandLine.getOptionValue(ALIGNED_ARGS) != null) { |
| aligned = Boolean.valueOf(commandLine.getOptionValue(ALIGNED_ARGS)); |
| } |
| |
| if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) { |
| timestampPrecision = commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS); |
| } |
| final String[] opTypeInferValues = commandLine.getOptionValues(TYPE_INFER_ARGS); |
| if (opTypeInferValues != null && opTypeInferValues.length > 0) { |
| for (String opTypeInferValue : opTypeInferValues) { |
| if (opTypeInferValue.contains("=")) { |
| final String[] typeInfoExpressionArr = opTypeInferValue.split("="); |
| final String key = typeInfoExpressionArr[0]; |
| final String value = typeInfoExpressionArr[1]; |
| applyTypeInferArgs(key, value); |
| } |
| } |
| } |
| if (commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS) != null) { |
| linesPerFailedFile = Integer.parseInt(commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS)); |
| } |
| } |
| |
| private static void applyTypeInferArgs(String key, String value) throws ArgsErrorException { |
| if (!TYPE_INFER_KEY_DICT.containsKey(key)) { |
| throw new ArgsErrorException("Unknown type infer key: " + key); |
| } |
| if (!TYPE_INFER_VALUE_DICT.containsKey(value)) { |
| throw new ArgsErrorException("Unknown type infer value: " + value); |
| } |
| if (key.equals(DATATYPE_NAN) |
| && !(value.equals(DATATYPE_FLOAT) |
| || value.equals(DATATYPE_DOUBLE) |
| || value.equals(DATATYPE_TEXT))) { |
| throw new ArgsErrorException("NaN can not convert to " + value); |
| } |
| if (key.equals(DATATYPE_BOOLEAN) |
| && !(value.equals(DATATYPE_BOOLEAN) || value.equals(DATATYPE_TEXT))) { |
| throw new ArgsErrorException("Boolean can not convert to " + value); |
| } |
| final TSDataType srcType = TYPE_INFER_VALUE_DICT.get(key); |
| final TSDataType dstType = TYPE_INFER_VALUE_DICT.get(value); |
| if (dstType.getType() < srcType.getType()) { |
| throw new ArgsErrorException(key + " can not convert to " + value); |
| } |
| TYPE_INFER_KEY_DICT.put(key, TYPE_INFER_VALUE_DICT.get(value)); |
| } |
| |
| public static void main(String[] args) throws IoTDBConnectionException { |
| Options options = createOptions(); |
| HelpFormatter hf = new HelpFormatter(); |
| hf.setOptionComparator(null); |
| hf.setWidth(MAX_HELP_CONSOLE_WIDTH); |
| CommandLine commandLine = null; |
| CommandLineParser parser = new DefaultParser(); |
| |
| if (args == null || args.length == 0) { |
| ioTPrinter.println("Too few params input, please check the following hint."); |
| hf.printHelp(TSFILEDB_CLI_PREFIX, options, true); |
| System.exit(CODE_ERROR); |
| } |
| try { |
| commandLine = parser.parse(options, args); |
| } catch (org.apache.commons.cli.ParseException e) { |
| ioTPrinter.println("Parse error: " + e.getMessage()); |
| hf.printHelp(TSFILEDB_CLI_PREFIX, options, true); |
| System.exit(CODE_ERROR); |
| } |
| if (commandLine.hasOption(HELP_ARGS)) { |
| hf.printHelp(TSFILEDB_CLI_PREFIX, options, true); |
| System.exit(CODE_ERROR); |
| } |
| |
| try { |
| parseBasicParams(commandLine); |
| String filename = commandLine.getOptionValue(FILE_ARGS); |
| if (filename == null) { |
| hf.printHelp(TSFILEDB_CLI_PREFIX, options, true); |
| System.exit(CODE_ERROR); |
| } |
| parseSpecialParams(commandLine); |
| } catch (ArgsErrorException e) { |
| ioTPrinter.println("Args error: " + e.getMessage()); |
| System.exit(CODE_ERROR); |
| } catch (Exception e) { |
| ioTPrinter.println("Encounter an error, because: " + e.getMessage()); |
| System.exit(CODE_ERROR); |
| } |
| |
| System.exit( |
| importFromTargetPath( |
| host, Integer.parseInt(port), username, password, targetPath, timeZoneID)); |
| } |
| |
| /** |
| * Specifying a CSV file or a directory including CSV files that you want to import. This method |
| * can be offered to console cli to implement importing CSV file by command. |
| * |
| * @param host |
| * @param port |
| * @param username |
| * @param password |
| * @param targetPath a CSV file or a directory including CSV files |
| * @param timeZone |
| * @return the status code |
| * @throws IoTDBConnectionException |
| */ |
| @SuppressWarnings({"squid:S2093"}) // ignore try-with-resources |
| public static int importFromTargetPath( |
| String host, int port, String username, String password, String targetPath, String timeZone) |
| throws IoTDBConnectionException { |
| try { |
| session = new Session(host, port, username, password, false); |
| session.open(false); |
| timeZoneID = timeZone; |
| setTimeZone(); |
| |
| File file = new File(targetPath); |
| if (file.isFile()) { |
| if (file.getName().endsWith(SQL_SUFFIXS)) { |
| importFromSqlFile(file); |
| } else { |
| importFromSingleFile(file); |
| } |
| } else if (file.isDirectory()) { |
| File[] files = file.listFiles(); |
| if (files == null) { |
| return CODE_OK; |
| } |
| |
| for (File subFile : files) { |
| if (subFile.isFile()) { |
| if (subFile.getName().endsWith(SQL_SUFFIXS)) { |
| importFromSqlFile(subFile); |
| } else { |
| importFromSingleFile(subFile); |
| } |
| } |
| } |
| } else { |
| ioTPrinter.println("File not found!"); |
| return CODE_ERROR; |
| } |
| } catch (IoTDBConnectionException | StatementExecutionException e) { |
| ioTPrinter.println("Encounter an error when connecting to server, because " + e.getMessage()); |
| return CODE_ERROR; |
| } finally { |
| if (session != null) { |
| session.close(); |
| } |
| } |
| return CODE_OK; |
| } |
| |
| /** |
| * import the CSV file and load headers and records. |
| * |
| * @param file the File object of the CSV file that you want to import. |
| */ |
| private static void importFromSingleFile(File file) { |
| if (file.getName().endsWith(CSV_SUFFIXS) || file.getName().endsWith(TXT_SUFFIXS)) { |
| try { |
| CSVParser csvRecords = readCsvFile(file.getAbsolutePath()); |
| List<String> headerNames = csvRecords.getHeaderNames(); |
| Stream<CSVRecord> records = csvRecords.stream(); |
| if (headerNames.isEmpty()) { |
| ioTPrinter.println("Empty file!"); |
| return; |
| } |
| if (!timeColumn.equalsIgnoreCase(filterBomHeader(headerNames.get(0)))) { |
| ioTPrinter.println("The first field of header must be `Time`!"); |
| return; |
| } |
| String failedFilePath = null; |
| if (failedFileDirectory == null) { |
| failedFilePath = file.getAbsolutePath() + ".failed"; |
| } else { |
| failedFilePath = failedFileDirectory + file.getName() + ".failed"; |
| } |
| if (!deviceColumn.equalsIgnoreCase(headerNames.get(1))) { |
| writeDataAlignedByTime(headerNames, records, failedFilePath); |
| } else { |
| writeDataAlignedByDevice(headerNames, records, failedFilePath); |
| } |
| } catch (IOException | IllegalPathException e) { |
| ioTPrinter.println("CSV file read exception because: " + e.getMessage()); |
| } |
| } else { |
| ioTPrinter.println("The file name must end with \"csv\" or \"txt\"!"); |
| } |
| } |
| |
| private static void importFromSqlFile(File file) { |
| try (BufferedReader br = new BufferedReader(new FileReader(file.getAbsolutePath()))) { |
| String line; |
| while ((line = br.readLine()) != null) { |
| executeSql(line); |
| } |
| ioTPrinter.println(file.getName() + " Import completely!"); |
| } catch (IOException e) { |
| ioTPrinter.println("SQL file read exception because: " + e.getMessage()); |
| } |
| } |
| |
| private static void executeSql(String sql) { |
| try (Statement statement = getConnection().createStatement()) { |
| statement.setFetchSize(fetchSize); |
| statement.execute(sql.trim()); |
| } catch (SQLException e) { |
| ioTPrinter.println(IOTDB_ERROR_PREFIX + " Can't execute sql because " + e.getMessage()); |
| System.exit(CODE_ERROR); |
| } |
| } |
| |
| private static Connection getConnection() { |
| // JDBC driver name and database URL |
| String driver = org.apache.iotdb.jdbc.IoTDBDriver.class.getName(); |
| String url = Config.IOTDB_URL_PREFIX + host + ":" + port + "/"; |
| |
| Connection connection = null; |
| try { |
| Class.forName(driver); |
| connection = DriverManager.getConnection(url, username, password); |
| } catch (ClassNotFoundException e) { |
| e.printStackTrace(); |
| } catch (SQLException e) { |
| e.printStackTrace(); |
| } |
| return connection; |
| } |
| |
| /** |
| * if the data is aligned by time, the data will be written by this method. |
| * |
| * @param headerNames the header names of CSV file |
| * @param records the records of CSV file |
| * @param failedFilePath the directory to save the failed files |
| */ |
| @SuppressWarnings("squid:S3776") |
| private static void writeDataAlignedByTime( |
| List<String> headerNames, Stream<CSVRecord> records, String failedFilePath) |
| throws IllegalPathException { |
| HashMap<String, List<String>> deviceAndMeasurementNames = new HashMap<>(); |
| HashMap<String, TSDataType> headerTypeMap = new HashMap<>(); |
| HashMap<String, String> headerNameMap = new HashMap<>(); |
| parseHeaders(headerNames, deviceAndMeasurementNames, headerTypeMap, headerNameMap); |
| |
| Set<String> devices = deviceAndMeasurementNames.keySet(); |
| if (headerTypeMap.isEmpty()) { |
| queryType(devices, headerTypeMap, "Time"); |
| } |
| |
| List<String> deviceIds = new ArrayList<>(); |
| List<Long> times = new ArrayList<>(); |
| List<List<String>> measurementsList = new ArrayList<>(); |
| List<List<TSDataType>> typesList = new ArrayList<>(); |
| List<List<Object>> valuesList = new ArrayList<>(); |
| |
| AtomicReference<Boolean> hasStarted = new AtomicReference<>(false); |
| AtomicInteger pointSize = new AtomicInteger(0); |
| |
| ArrayList<List<Object>> failedRecords = new ArrayList<>(); |
| |
| records.forEach( |
| recordObj -> { |
| if (Boolean.FALSE.equals(hasStarted.get())) { |
| hasStarted.set(true); |
| } else if (pointSize.get() >= batchPointSize) { |
| writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3); |
| pointSize.set(0); |
| } |
| |
| boolean isFail = false; |
| |
| for (Map.Entry<String, List<String>> entry : deviceAndMeasurementNames.entrySet()) { |
| String deviceId = entry.getKey(); |
| List<String> measurementNames = entry.getValue(); |
| ArrayList<TSDataType> types = new ArrayList<>(); |
| ArrayList<Object> values = new ArrayList<>(); |
| ArrayList<String> measurements = new ArrayList<>(); |
| for (String measurement : measurementNames) { |
| String header = deviceId + "." + measurement; |
| String value = recordObj.get(headerNameMap.get(header)); |
| if (!"".equals(value)) { |
| TSDataType type; |
| if (!headerTypeMap.containsKey(header)) { |
| type = typeInfer(value); |
| if (type != null) { |
| headerTypeMap.put(header, type); |
| } else { |
| ioTPrinter.printf( |
| "Line '%s', column '%s': '%s' unknown type%n", |
| recordObj.getRecordNumber(), header, value); |
| isFail = true; |
| } |
| } |
| type = headerTypeMap.get(header); |
| if (type != null) { |
| Object valueTrans = typeTrans(value, type); |
| if (valueTrans == null) { |
| isFail = true; |
| ioTPrinter.printf( |
| "Line '%s', column '%s': '%s' can't convert to '%s'%n", |
| recordObj.getRecordNumber(), header, value, type); |
| } else { |
| measurements.add(header.replace(deviceId + '.', "")); |
| types.add(type); |
| values.add(valueTrans); |
| pointSize.getAndIncrement(); |
| } |
| } |
| } |
| } |
| if (!measurements.isEmpty()) { |
| times.add(parseTimestamp(recordObj.get(timeColumn))); |
| deviceIds.add(deviceId); |
| typesList.add(types); |
| valuesList.add(values); |
| measurementsList.add(measurements); |
| } |
| } |
| if (isFail) { |
| failedRecords.add(recordObj.stream().collect(Collectors.toList())); |
| } |
| }); |
| if (!deviceIds.isEmpty()) { |
| writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3); |
| pointSize.set(0); |
| } |
| |
| if (!failedRecords.isEmpty()) { |
| writeFailedLinesFile(headerNames, failedFilePath, failedRecords); |
| } |
| if (Boolean.TRUE.equals(hasStarted.get())) { |
| ioTPrinter.println("Import completely!"); |
| } else { |
| ioTPrinter.println("No records!"); |
| } |
| } |
| |
| /** |
| * if the data is aligned by device, the data will be written by this method. |
| * |
| * @param headerNames the header names of CSV file |
| * @param records the records of CSV file |
| * @param failedFilePath the directory to save the failed files |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private static void writeDataAlignedByDevice( |
| List<String> headerNames, Stream<CSVRecord> records, String failedFilePath) |
| throws IllegalPathException { |
| HashMap<String, TSDataType> headerTypeMap = new HashMap<>(); |
| HashMap<String, String> headerNameMap = new HashMap<>(); |
| parseHeaders(headerNames, null, headerTypeMap, headerNameMap); |
| |
| AtomicReference<String> deviceName = new AtomicReference<>(null); |
| |
| HashSet<String> typeQueriedDevice = new HashSet<>(); |
| |
| // the data that interface need |
| List<Long> times = new ArrayList<>(); |
| List<List<TSDataType>> typesList = new ArrayList<>(); |
| List<List<Object>> valuesList = new ArrayList<>(); |
| List<List<String>> measurementsList = new ArrayList<>(); |
| |
| AtomicInteger pointSize = new AtomicInteger(0); |
| |
| ArrayList<List<Object>> failedRecords = new ArrayList<>(); |
| |
| records.forEach( |
| recordObj -> { |
| // only run in first record |
| if (deviceName.get() == null) { |
| deviceName.set(recordObj.get(1)); |
| } else if (!Objects.equals(deviceName.get(), recordObj.get(1))) { |
| // if device changed |
| writeAndEmptyDataSet( |
| deviceName.get(), times, typesList, valuesList, measurementsList, 3); |
| deviceName.set(recordObj.get(1)); |
| pointSize.set(0); |
| } else if (pointSize.get() >= batchPointSize) { |
| // insert a batch |
| writeAndEmptyDataSet( |
| deviceName.get(), times, typesList, valuesList, measurementsList, 3); |
| pointSize.set(0); |
| } |
| |
| // the data of the record |
| ArrayList<TSDataType> types = new ArrayList<>(); |
| ArrayList<Object> values = new ArrayList<>(); |
| ArrayList<String> measurements = new ArrayList<>(); |
| |
| AtomicReference<Boolean> isFail = new AtomicReference<>(false); |
| |
| // read data from record |
| for (Map.Entry<String, String> headerNameEntry : headerNameMap.entrySet()) { |
| // headerNameWithoutType is equal to headerName if the CSV column do not have data type. |
| String headerNameWithoutType = headerNameEntry.getKey(); |
| String headerName = headerNameEntry.getValue(); |
| String value = recordObj.get(headerName); |
| if (!"".equals(value)) { |
| TSDataType type; |
| // Get the data type directly if the CSV column have data type. |
| if (!headerTypeMap.containsKey(headerNameWithoutType)) { |
| boolean hasResult = false; |
| // query the data type in iotdb |
| if (!typeQueriedDevice.contains(deviceName.get())) { |
| if (headerTypeMap.isEmpty()) { |
| Set<String> devices = new HashSet<>(); |
| devices.add(deviceName.get()); |
| queryType(devices, headerTypeMap, deviceColumn); |
| } |
| typeQueriedDevice.add(deviceName.get()); |
| } |
| type = typeInfer(value); |
| if (type != null) { |
| headerTypeMap.put(headerNameWithoutType, type); |
| } else { |
| ioTPrinter.printf( |
| "Line '%s', column '%s': '%s' unknown type%n", |
| recordObj.getRecordNumber(), headerNameWithoutType, value); |
| isFail.set(true); |
| } |
| } |
| type = headerTypeMap.get(headerNameWithoutType); |
| if (type != null) { |
| Object valueTrans = typeTrans(value, type); |
| if (valueTrans == null) { |
| isFail.set(true); |
| ioTPrinter.printf( |
| "Line '%s', column '%s': '%s' can't convert to '%s'%n", |
| recordObj.getRecordNumber(), headerNameWithoutType, value, type); |
| } else { |
| values.add(valueTrans); |
| measurements.add(headerNameWithoutType); |
| types.add(type); |
| pointSize.getAndIncrement(); |
| } |
| } |
| } |
| } |
| if (Boolean.TRUE.equals(isFail.get())) { |
| failedRecords.add(recordObj.stream().collect(Collectors.toList())); |
| } |
| if (!measurements.isEmpty()) { |
| times.add(parseTimestamp(recordObj.get(timeColumn))); |
| typesList.add(types); |
| valuesList.add(values); |
| measurementsList.add(measurements); |
| } |
| }); |
| if (!times.isEmpty()) { |
| writeAndEmptyDataSet(deviceName.get(), times, typesList, valuesList, measurementsList, 3); |
| pointSize.set(0); |
| } |
| if (!failedRecords.isEmpty()) { |
| writeFailedLinesFile(headerNames, failedFilePath, failedRecords); |
| } |
| ioTPrinter.println("Import completely!"); |
| } |
| |
| private static void writeFailedLinesFile( |
| List<String> headerNames, String failedFilePath, ArrayList<List<Object>> failedRecords) { |
| int fileIndex = 0; |
| int from = 0; |
| int failedRecordsSize = failedRecords.size(); |
| int restFailedRecords = failedRecordsSize; |
| while (from < failedRecordsSize) { |
| int step = Math.min(restFailedRecords, linesPerFailedFile); |
| writeCsvFile( |
| headerNames, |
| failedRecords.subList(from, from + step), |
| failedFilePath + "_" + fileIndex++); |
| from += step; |
| restFailedRecords -= step; |
| } |
| } |
| |
| private static void writeAndEmptyDataSet( |
| String device, |
| List<Long> times, |
| List<List<TSDataType>> typesList, |
| List<List<Object>> valuesList, |
| List<List<String>> measurementsList, |
| int retryTime) { |
| try { |
| if (Boolean.FALSE.equals(aligned)) { |
| session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList); |
| } else { |
| session.insertAlignedRecordsOfOneDevice( |
| device, times, measurementsList, typesList, valuesList); |
| } |
| } catch (IoTDBConnectionException e) { |
| if (retryTime > 0) { |
| try { |
| session.open(); |
| } catch (IoTDBConnectionException ex) { |
| ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage()); |
| } |
| writeAndEmptyDataSet(device, times, typesList, valuesList, measurementsList, --retryTime); |
| } |
| } catch (StatementExecutionException e) { |
| ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage()); |
| try { |
| session.close(); |
| } catch (IoTDBConnectionException ex) { |
| // do nothing |
| } |
| System.exit(1); |
| } finally { |
| times.clear(); |
| typesList.clear(); |
| valuesList.clear(); |
| measurementsList.clear(); |
| } |
| } |
| |
| private static void writeAndEmptyDataSet( |
| List<String> deviceIds, |
| List<Long> times, |
| List<List<TSDataType>> typesList, |
| List<List<Object>> valuesList, |
| List<List<String>> measurementsList, |
| int retryTime) { |
| try { |
| if (Boolean.FALSE.equals(aligned)) { |
| session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); |
| } else { |
| session.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); |
| } |
| } catch (IoTDBConnectionException e) { |
| if (retryTime > 0) { |
| try { |
| session.open(); |
| } catch (IoTDBConnectionException ex) { |
| ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage()); |
| } |
| writeAndEmptyDataSet( |
| deviceIds, times, typesList, valuesList, measurementsList, --retryTime); |
| } |
| } catch (StatementExecutionException e) { |
| ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage()); |
| try { |
| session.close(); |
| } catch (IoTDBConnectionException ex) { |
| // do nothing |
| } |
| System.exit(1); |
| } finally { |
| deviceIds.clear(); |
| times.clear(); |
| typesList.clear(); |
| valuesList.clear(); |
| measurementsList.clear(); |
| } |
| } |
| |
| /** |
| * read data from the CSV file |
| * |
| * @param path |
| * @return CSVParser csv parser |
| * @throws IOException when reading the csv file failed. |
| */ |
| private static CSVParser readCsvFile(String path) throws IOException { |
| return CSVFormat.Builder.create(CSVFormat.DEFAULT) |
| .setHeader() |
| .setSkipHeaderRecord(true) |
| .setQuote('`') |
| .setEscape('\\') |
| .setIgnoreEmptyLines(true) |
| .build() |
| .parse(new InputStreamReader(new FileInputStream(path))); |
| } |
| |
| /** |
| * parse deviceNames, measurementNames(aligned by time), headerType from headers |
| * |
| * @param headerNames |
| * @param deviceAndMeasurementNames |
| * @param headerTypeMap |
| * @param headerNameMap |
| */ |
| @SuppressWarnings( |
| "squid:S135") // ignore for loops should not contain more than a single "break" or "continue" |
| // statement |
| private static void parseHeaders( |
| List<String> headerNames, |
| @Nullable HashMap<String, List<String>> deviceAndMeasurementNames, |
| HashMap<String, TSDataType> headerTypeMap, |
| HashMap<String, String> headerNameMap) |
| throws IllegalPathException { |
| String regex = "(?<=\\()\\S+(?=\\))"; |
| Pattern pattern = Pattern.compile(regex); |
| for (String headerName : headerNames) { |
| if ("Time".equalsIgnoreCase(filterBomHeader(headerName))) { |
| timeColumn = headerName; |
| continue; |
| } else if ("Device".equalsIgnoreCase(headerName)) { |
| deviceColumn = headerName; |
| continue; |
| } |
| Matcher matcher = pattern.matcher(headerName); |
| String type; |
| String headerNameWithoutType; |
| if (matcher.find()) { |
| type = matcher.group(); |
| headerNameWithoutType = headerName.replace("(" + type + ")", "").replaceAll("\\s+", ""); |
| headerNameMap.put(headerNameWithoutType, headerName); |
| headerTypeMap.put(headerNameWithoutType, getType(type)); |
| } else { |
| headerNameWithoutType = headerName; |
| headerNameMap.put(headerName, headerName); |
| } |
| String[] split = PathUtils.splitPathToDetachedNodes(headerNameWithoutType); |
| String measurementName = split[split.length - 1]; |
| String deviceName = StringUtils.join(Arrays.copyOfRange(split, 0, split.length - 1), '.'); |
| if (deviceAndMeasurementNames != null) { |
| deviceAndMeasurementNames.putIfAbsent(deviceName, new ArrayList<>()); |
| deviceAndMeasurementNames.get(deviceName).add(measurementName); |
| } |
| } |
| } |
| |
| /** |
| * query data type of timeseries from IoTDB |
| * |
| * @param deviceNames |
| * @param headerTypeMap |
| * @param alignedType |
| * @throws IoTDBConnectionException |
| * @throws StatementExecutionException |
| */ |
| private static void queryType( |
| Set<String> deviceNames, HashMap<String, TSDataType> headerTypeMap, String alignedType) { |
| for (String deviceName : deviceNames) { |
| String sql = "show timeseries " + deviceName + ".*"; |
| SessionDataSet sessionDataSet = null; |
| try { |
| sessionDataSet = session.executeQueryStatement(sql); |
| int tsIndex = sessionDataSet.getColumnNames().indexOf(ColumnHeaderConstant.TIMESERIES); |
| int dtIndex = sessionDataSet.getColumnNames().indexOf(ColumnHeaderConstant.DATATYPE); |
| while (sessionDataSet.hasNext()) { |
| RowRecord rowRecord = sessionDataSet.next(); |
| List<Field> fields = rowRecord.getFields(); |
| String timeseries = fields.get(tsIndex).getStringValue(); |
| String dataType = fields.get(dtIndex).getStringValue(); |
| if (Objects.equals(alignedType, "Time")) { |
| headerTypeMap.put(timeseries, getType(dataType)); |
| } else if (Objects.equals(alignedType, deviceColumn)) { |
| String[] split = PathUtils.splitPathToDetachedNodes(timeseries); |
| String measurement = split[split.length - 1]; |
| headerTypeMap.put(measurement, getType(dataType)); |
| } |
| } |
| } catch (StatementExecutionException | IllegalPathException | IoTDBConnectionException e) { |
| ioTPrinter.println( |
| "Meet error when query the type of timeseries because " + e.getMessage()); |
| try { |
| session.close(); |
| } catch (IoTDBConnectionException ex) { |
| // do nothing |
| } |
| System.exit(1); |
| } |
| } |
| } |
| |
| /** |
| * return the TSDataType |
| * |
| * @param typeStr |
| * @return |
| */ |
| private static TSDataType getType(String typeStr) { |
| switch (typeStr) { |
| case "TEXT": |
| return TEXT; |
| case "BOOLEAN": |
| return BOOLEAN; |
| case "INT32": |
| return INT32; |
| case "INT64": |
| return INT64; |
| case "FLOAT": |
| return FLOAT; |
| case "DOUBLE": |
| return DOUBLE; |
| default: |
| return null; |
| } |
| } |
| |
| /** |
| * if data type of timeseries is not defined in headers of schema, this method will be called to |
| * do type inference |
| * |
| * @param strValue |
| * @return |
| */ |
| private static TSDataType typeInfer(String strValue) { |
| if (strValue.contains("\"")) { |
| return TEXT; |
| } |
| if (isBoolean(strValue)) { |
| return TYPE_INFER_KEY_DICT.get(DATATYPE_BOOLEAN); |
| } else if (isNumber(strValue)) { |
| if (!strValue.contains(TsFileConstant.PATH_SEPARATOR)) { |
| if (isConvertFloatPrecisionLack(StringUtils.trim(strValue))) { |
| return TYPE_INFER_KEY_DICT.get(DATATYPE_LONG); |
| } |
| return TYPE_INFER_KEY_DICT.get(DATATYPE_INT); |
| } else { |
| return TYPE_INFER_KEY_DICT.get(DATATYPE_FLOAT); |
| } |
| } else if (DATATYPE_NULL.equals(strValue) || DATATYPE_NULL.toUpperCase().equals(strValue)) { |
| return null; |
| // "NaN" is returned if the NaN Literal is given in Parser |
| } else if (DATATYPE_NAN.equals(strValue)) { |
| return TYPE_INFER_KEY_DICT.get(DATATYPE_NAN); |
| } else { |
| return TSDataType.TEXT; |
| } |
| } |
| |
| static boolean isNumber(String s) { |
| if (s == null || s.equals(DATATYPE_NAN)) { |
| return false; |
| } |
| try { |
| Double.parseDouble(s); |
| } catch (NumberFormatException e) { |
| return false; |
| } |
| return true; |
| } |
| |
| private static boolean isBoolean(String s) { |
| return s.equalsIgnoreCase(SqlConstant.BOOLEAN_TRUE) |
| || s.equalsIgnoreCase(SqlConstant.BOOLEAN_FALSE); |
| } |
| |
| private static boolean isConvertFloatPrecisionLack(String s) { |
| return Long.parseLong(s) > (2 << 24); |
| } |
| |
| /** |
| * @param value |
| * @param type |
| * @return |
| */ |
| private static Object typeTrans(String value, TSDataType type) { |
| try { |
| switch (type) { |
| case TEXT: |
| if (value.startsWith("\"") && value.endsWith("\"")) { |
| return value.substring(1, value.length() - 1); |
| } |
| return value; |
| case BOOLEAN: |
| if (!"true".equalsIgnoreCase(value) && !"false".equalsIgnoreCase(value)) { |
| return null; |
| } |
| return Boolean.parseBoolean(value); |
| case INT32: |
| return Integer.parseInt(value); |
| case INT64: |
| return Long.parseLong(value); |
| case FLOAT: |
| return Float.parseFloat(value); |
| case DOUBLE: |
| return Double.parseDouble(value); |
| default: |
| return null; |
| } |
| } catch (NumberFormatException e) { |
| return null; |
| } |
| } |
| |
| private static long parseTimestamp(String str) { |
| long timestamp; |
| try { |
| timestamp = Long.parseLong(str); |
| } catch (NumberFormatException e) { |
| timestamp = DateTimeUtils.convertDatetimeStrToLong(str, zoneId, timestampPrecision); |
| } |
| return timestamp; |
| } |
| |
| private static String filterBomHeader(String s) { |
| byte[] bom = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; |
| byte[] bytes = Arrays.copyOf(s.getBytes(), 3); |
| if (Arrays.equals(bom, bytes)) { |
| return s.substring(1); |
| } |
| return s; |
| } |
| } |