blob: 986b019ba47e49cd1759051de1ba07cf47761abd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.common;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class TestUtils {
public static long TEST_TIME_OUT_MS = 200;
public static ByteBuffer seralizePartitionTable = getPartitionTable(3).serialize();
private TestUtils() {
// util class
}
public static Node getNode(int nodeNum) {
Node node = new Node();
node.setInternalIp("192.168.0." + nodeNum);
node.setMetaPort(ClusterDescriptor.getInstance().getConfig().getInternalMetaPort());
node.setDataPort(ClusterDescriptor.getInstance().getConfig().getInternalDataPort());
node.setNodeIdentifier(nodeNum);
node.setClientPort(IoTDBDescriptor.getInstance().getConfig().getRpcPort());
node.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
return node;
}
public static RaftNode getRaftNode(int nodeNum, int raftId) {
return new RaftNode(getNode(nodeNum), raftId);
}
public static List<Log> prepareNodeLogs(int logNum) {
List<Log> logList = new ArrayList<>();
for (int i = 0; i < logNum; i++) {
AddNodeLog log = new AddNodeLog();
log.setNewNode(getNode(i));
log.setPartitionTable(seralizePartitionTable);
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
logList.add(log);
}
return logList;
}
public static StartUpStatus getStartUpStatus() {
StartUpStatus startUpStatus = new StartUpStatus();
startUpStatus.setPartitionInterval(
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval());
startUpStatus.setHashSalt(ClusterConstant.HASH_SALT);
startUpStatus.setReplicationNumber(
ClusterDescriptor.getInstance().getConfig().getReplicationNum());
startUpStatus.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
startUpStatus.setMultiRaftFactor(
ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
List<Node> seedNodeList = new ArrayList<>();
for (int i = 0; i < 100; i += 10) {
seedNodeList.add(getNode(i));
}
startUpStatus.setSeedNodeList(seedNodeList);
return startUpStatus;
}
public static List<Log> prepareTestLogs(int logNum) {
List<Log> logList = new ArrayList<>();
for (int i = 0; i < logNum; i++) {
Log log = new EmptyContentLog();
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
logList.add(log);
}
return logList;
}
public static List<Log> prepareLargeTestLogs(int logNum) {
List<Log> logList = new ArrayList<>();
for (int i = 0; i < logNum; i++) {
Log log = new LargeTestLog();
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
log.setByteSize(8192);
logList.add(log);
}
return logList;
}
public static List<TimeValuePair> getTestTimeValuePairs(
int offset, int size, int step, TSDataType dataType) {
List<TimeValuePair> ret = new ArrayList<>(size);
long currTime = offset;
for (int i = 0; i < size; i++) {
TsPrimitiveType value = TsPrimitiveType.getByType(dataType, currTime);
TimeValuePair pair = new TimeValuePair(currTime, value);
currTime += step;
ret.add(pair);
}
return ret;
}
public static List<BatchData> getTestBatches(
int offset, int size, int batchSize, int step, TSDataType dataType) {
List<BatchData> ret = new ArrayList<>(size);
long currTime = offset;
BatchData currBatch = null;
for (int i = 0; i < size; i++) {
if (i % batchSize == 0) {
if (currBatch != null) {
ret.add(currBatch);
}
currBatch = new BatchData(dataType);
}
TsPrimitiveType value = TsPrimitiveType.getByType(dataType, currTime);
currBatch.putAnObject(currTime, value.getValue());
currTime += step;
}
if (currBatch != null) {
ret.add(currBatch);
}
return ret;
}
public static PartitionTable getPartitionTable(int nodeNum) {
List<Node> nodes = new ArrayList<>();
for (int i = 0; i < nodeNum; i++) {
nodes.add(getNode(i));
}
return new SlotPartitionTable(nodes, getNode(0));
}
public static String getTestSg(int i) {
return "root.test" + i;
}
public static String getTestSeries(int sgNum, int seriesNum) {
return getTestSg(sgNum) + "." + getTestMeasurement(seriesNum);
}
public static String getTestMeasurement(int seriesNum) {
return "s" + seriesNum;
}
public static IMeasurementSchema getTestMeasurementSchema(int seriesNum) {
TSDataType dataType = TSDataType.DOUBLE;
TSEncoding encoding = IoTDBDescriptor.getInstance().getConfig().getDefaultDoubleEncoding();
return new MeasurementSchema(
TestUtils.getTestMeasurement(seriesNum),
dataType,
encoding,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
}
public static IMeasurementMNode getTestMeasurementMNode(int seriesNum) {
TSDataType dataType = TSDataType.DOUBLE;
TSEncoding encoding = IoTDBDescriptor.getInstance().getConfig().getDefaultDoubleEncoding();
IMeasurementSchema measurementSchema =
new MeasurementSchema(
TestUtils.getTestMeasurement(seriesNum),
dataType,
encoding,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
return new MeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema, null);
}
public static TimeseriesSchema getTestTimeSeriesSchema(int sgNum, int seriesNum) {
TSDataType dataType = TSDataType.DOUBLE;
TSEncoding encoding = IoTDBDescriptor.getInstance().getConfig().getDefaultDoubleEncoding();
return new TimeseriesSchema(
TestUtils.getTestSeries(sgNum, seriesNum),
dataType,
encoding,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
}
public static BatchData genBatchData(TSDataType dataType, int offset, int size) {
BatchData batchData = new BatchData(dataType);
for (long i = offset; i < offset + size; i++) {
switch (dataType) {
case DOUBLE:
batchData.putDouble(i, i * 1.0);
break;
case TEXT:
batchData.putBinary(i, new Binary(String.valueOf(i)));
break;
case INT64:
batchData.putLong(i, i);
break;
case INT32:
batchData.putInt(i, (int) i);
break;
case FLOAT:
batchData.putFloat(i, i * 1.0f);
break;
case BOOLEAN:
batchData.putBoolean(i, (i % 2) == 1);
break;
}
}
return batchData;
}
public static boolean batchEquals(BatchData batchA, BatchData batchB) {
if (batchA == batchB) {
return true;
}
if (batchA == null || batchB == null) {
return false;
}
if (!batchA.getDataType().equals(batchB.getDataType())) {
return false;
}
if (batchA.length() != batchB.length()) {
return false;
}
while (batchA.hasCurrent()) {
if (!batchB.hasCurrent()) {
return false;
}
long timeA = batchA.currentTime();
Object valueA = batchA.currentValue();
long timeB = batchB.currentTime();
Object valueB = batchB.currentValue();
if (timeA != timeB || !valueA.equals(valueB)) {
return false;
}
batchA.next();
batchB.next();
}
return true;
}
public static void prepareData()
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException,
IllegalPathException {
InsertRowPlan insertPlan = new InsertRowPlan();
// data for raw data query and aggregation
// 10 devices (storage groups)
for (int j = 0; j < 10; j++) {
insertPlan.setPrefixPath(new PartialPath(getTestSg(j)));
String[] measurements = new String[10];
IMeasurementMNode[] mNodes = new IMeasurementMNode[10];
// 10 series each device, all double
for (int i = 0; i < 10; i++) {
measurements[i] = getTestMeasurement(i);
mNodes[i] = TestUtils.getTestMeasurementMNode(i);
}
insertPlan.setMeasurements(measurements);
insertPlan.setNeedInferType(true);
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
// the first sequential file
for (int i = 10; i < 20; i++) {
insertPlan.setTime(i);
Object[] values = new Object[10];
for (int k = 0; k < 10; k++) {
values[k] = String.valueOf(i);
}
insertPlan.setValues(values);
insertPlan.setMeasurementMNodes(mNodes);
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
StorageEngine.getInstance().syncCloseAllProcessor();
// the first unsequential file, not overlapped with the sequential file
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
Object[] values = new Object[10];
for (int k = 0; k < 10; k++) {
values[k] = String.valueOf(i);
}
insertPlan.setValues(values);
insertPlan.setMeasurementMNodes(mNodes);
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
StorageEngine.getInstance().syncCloseAllProcessor();
// the second unsequential file, overlapped with the sequential file
for (int i = 10; i < 20; i++) {
insertPlan.setTime(i);
Object[] values = new Object[10];
for (int k = 0; k < 10; k++) {
values[k] = String.valueOf(i);
}
insertPlan.setValues(values);
insertPlan.setMeasurementMNodes(mNodes);
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
StorageEngine.getInstance().syncCloseAllProcessor();
}
// data for fill
insertPlan.setPrefixPath(new PartialPath(getTestSg(0)));
String[] measurements = new String[] {getTestMeasurement(10)};
IMeasurementMNode[] schemas = new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(10)};
insertPlan.setMeasurements(measurements);
insertPlan.setNeedInferType(true);
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i : new int[] {0, 10}) {
insertPlan.setTime(i);
Object[] values = new Object[] {String.valueOf(i)};
insertPlan.setValues(values);
insertPlan.setMeasurementMNodes(schemas);
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
}
/**
* The TsFileResource's path should be consist with the {@link
* org.apache.iotdb.db.utils.FilePathUtils#splitTsFilePath(TsFileResource)}
*/
public static List<TsFileResource> prepareTsFileResources(
int sgNum, int fileNum, int seriesNum, int ptNum, boolean asHardLink)
throws IOException, WriteProcessException {
List<TsFileResource> ret = new ArrayList<>();
for (int i = 0; i < fileNum; i++) {
String fileName =
"target"
+ File.separator
+ "data"
+ File.separator
+ String.format(
TestUtils.getTestSg(sgNum)
+ File.separator
+ 0
+ File.separator
+ 0
+ File.separator
+ "0-%d-0-0"
+ TsFileConstant.TSFILE_SUFFIX,
i);
if (asHardLink) {
fileName = fileName + ".0_0";
}
File file = new File(fileName);
file.getParentFile().mkdirs();
try (TsFileWriter writer = new TsFileWriter(file)) {
for (int k = 0; k < seriesNum; k++) {
IMeasurementSchema schema = getTestMeasurementSchema(k);
writer.registerTimeseries(new Path(getTestSg(sgNum), schema.getMeasurementId()), schema);
}
for (int j = 0; j < ptNum; j++) {
long timestamp = i * ptNum + j;
TSRecord record = new TSRecord(timestamp, getTestSg(sgNum));
for (int k = 0; k < seriesNum; k++) {
IMeasurementSchema schema = getTestMeasurementSchema(k);
DataPoint dataPoint =
DataPoint.getDataPoint(
schema.getType(), schema.getMeasurementId(), String.valueOf(k));
record.addTuple(dataPoint);
}
writer.write(record);
}
}
TsFileResource resource = new TsFileResource(file);
resource.updateStartTime(TestUtils.getTestSg(sgNum), i * ptNum);
resource.updateEndTime(TestUtils.getTestSg(sgNum), (i + 1) * ptNum - 1);
resource.setMaxPlanIndex(i);
resource.setMinPlanIndex(i);
resource.serialize();
ret.add(resource);
}
return ret;
}
}