blob: 13805b93ba5b33b7e8e7b78f41a38db420c2c265 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
* series, the concurrent thread can be configured by CONCURRENCY
*
* <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
* with 6668 port
*/
public class DataMigrationExample {
private static final Logger LOGGER = LoggerFactory.getLogger(DataMigrationExample.class);
// used to read data from the source IoTDB
private static SessionPool readerPool;
// used to write data into the destination IoTDB
private static SessionPool writerPool;
// concurrent thread of loading timeseries data
private static final int CONCURRENCY = 5;
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
InterruptedException {
// the thread used for dataMigration must be smaller than session pool size
ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENCY);
String path = "root.**";
if (args.length != 0) {
path = args[0];
}
readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", CONCURRENCY);
writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", CONCURRENCY);
SessionDataSetWrapper deviceDataSet = readerPool.executeQueryStatement("count devices " + path);
DataIterator deviceIter = deviceDataSet.iterator();
int total;
if (deviceIter.next()) {
total = deviceIter.getInt(1);
LOGGER.info("Total devices: {}", total);
} else {
LOGGER.error("Can not get devices schema");
System.exit(1);
}
readerPool.closeResultSet(deviceDataSet);
deviceDataSet = readerPool.executeQueryStatement("show devices " + path);
deviceIter = deviceDataSet.iterator();
List<Future<Void>> futureList = new ArrayList<>();
int count = 0;
while (deviceIter.next()) {
count++;
Future<Void> future =
executorService.submit(new LoadThread(count, deviceIter.getString("Device")));
futureList.add(future);
}
readerPool.closeResultSet(deviceDataSet);
for (Future<Void> future : futureList) {
future.get();
}
executorService.shutdown();
readerPool.close();
writerPool.close();
}
static class LoadThread implements Callable<Void> {
String device;
Tablet tablet;
int i;
public LoadThread(int i, String device) {
this.i = i;
this.device = device;
}
@Override
public Void call() {
SessionDataSetWrapper dataSet = null;
long startTime = System.currentTimeMillis();
try {
dataSet = readerPool.executeQueryStatement(String.format("select * from %s", device));
DataIterator dataIter = dataSet.iterator();
List<String> columnNameList = dataIter.getColumnNameList();
List<String> columnTypeList = dataIter.getColumnTypeList();
List<MeasurementSchema> schemaList = new ArrayList<>();
for (int j = 1; j < columnNameList.size(); j++) {
PartialPath currentPath = new PartialPath(columnNameList.get(j));
schemaList.add(
new MeasurementSchema(
currentPath.getMeasurement(), TSDataType.valueOf(columnTypeList.get(j))));
}
tablet = new Tablet(device, schemaList, 300000);
while (dataIter.next()) {
int row = tablet.rowSize++;
tablet.timestamps[row] = dataIter.getLong(1);
for (int j = 0; j < schemaList.size(); ++j) {
if (dataIter.isNull(j + 2)) {
tablet.addValue(schemaList.get(j).getMeasurementId(), row, null);
continue;
}
switch (schemaList.get(j).getType()) {
case BOOLEAN:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getBoolean(j + 2));
break;
case INT32:
tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getInt(j + 2));
break;
case INT64:
tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getLong(j + 2));
break;
case FLOAT:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getFloat(j + 2));
break;
case DOUBLE:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getDouble(j + 2));
break;
case TEXT:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getString(j + 2));
break;
default:
LOGGER.info("Migration of this type of data is not supported");
}
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
writerPool.insertTablet(tablet, true);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
writerPool.insertTablet(tablet);
tablet.reset();
}
} catch (Exception e) {
LOGGER.error("Loading the {}-th device: {} failed {}", i, device, e.getMessage());
return null;
} finally {
if (dataSet != null) {
readerPool.closeResultSet(dataSet);
}
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
LOGGER.info("migrate device :{} using {} ms", device, totalTime);
}
LOGGER.info("Loading the {}-th device: {} success", i, device);
return null;
}
}
}