blob: dfe6a136f8c9a92e081ca3d6c8c93549f3430750 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.it.utils;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
public class ConfigNodeTestUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeTestUtils.class);
public static void checkNodeConfig(
List<TConfigNodeLocation> configNodeList,
List<TDataNodeLocation> dataNodeList,
List<ConfigNodeWrapper> configNodeWrappers,
List<DataNodeWrapper> dataNodeWrappers) {
// check ConfigNode
for (TConfigNodeLocation configNodeLocation : configNodeList) {
boolean found = false;
for (ConfigNodeWrapper configNodeWrapper : configNodeWrappers) {
if (configNodeWrapper.getIp().equals(configNodeLocation.getInternalEndPoint().getIp())
&& configNodeWrapper.getPort() == configNodeLocation.getInternalEndPoint().getPort()
&& configNodeWrapper.getConsensusPort()
== configNodeLocation.getConsensusEndPoint().getPort()) {
found = true;
break;
}
}
assertTrue(found);
}
// check DataNode
for (TDataNodeLocation dataNodeLocation : dataNodeList) {
boolean found = false;
for (DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
if (dataNodeWrapper.getIp().equals(dataNodeLocation.getClientRpcEndPoint().getIp())
&& dataNodeWrapper.getPort() == dataNodeLocation.getClientRpcEndPoint().getPort()
&& dataNodeWrapper.getInternalPort() == dataNodeLocation.getInternalEndPoint().getPort()
&& dataNodeWrapper.getSchemaRegionConsensusPort()
== dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()
&& dataNodeWrapper.getDataRegionConsensusPort()
== dataNodeLocation.getDataRegionConsensusEndPoint().getPort()) {
found = true;
break;
}
}
assertTrue(found);
}
}
/** Generate a PatternTree and serialize it into a ByteBuffer */
public static ByteBuffer generatePatternTreeBuffer(String[] paths)
throws IllegalPathException, IOException {
PathPatternTree patternTree = new PathPatternTree();
for (String path : paths) {
patternTree.appendPathPattern(new PartialPath(path));
}
patternTree.constructTree();
PublicBAOS baos = new PublicBAOS();
patternTree.serialize(baos);
return ByteBuffer.wrap(baos.toByteArray());
}
public static Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> constructPartitionSlotsMap(
String storageGroup,
int seriesSlotStart,
int seriesSlotEnd,
long timeSlotStart,
long timeSlotEnd,
long timePartitionInterval) {
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> result = new HashMap<>();
result.put(storageGroup, new HashMap<>());
for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
result
.get(storageGroup)
.put(seriesPartitionSlot, new TTimeSlotList().setTimePartitionSlots(new ArrayList<>()));
for (long j = timeSlotStart; j < timeSlotEnd; j++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(j * timePartitionInterval);
result
.get(storageGroup)
.get(seriesPartitionSlot)
.getTimePartitionSlots()
.add(timePartitionSlot);
}
}
return result;
}
public static void checkDataPartitionTable(
String storageGroup,
int seriesSlotStart,
int seriesSlotEnd,
long timeSlotStart,
long timeSlotEnd,
long timePartitionInterval,
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
dataPartitionTable) {
// Check the existence of StorageGroup
Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
// Check the number of SeriesPartitionSlot
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
seriesPartitionTable = dataPartitionTable.get(storageGroup);
Assert.assertEquals(seriesSlotEnd - seriesSlotStart, seriesPartitionTable.size());
for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
// Check the existence of SeriesPartitionSlot
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
// Check the number of TimePartitionSlot
Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
seriesPartitionTable.get(seriesPartitionSlot);
Assert.assertEquals(timeSlotEnd - timeSlotStart, timePartitionTable.size());
for (long j = timeSlotStart; j < timeSlotEnd; j++) {
// Check the existence of TimePartitionSlot
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(j * timePartitionInterval);
Assert.assertTrue(timePartitionTable.containsKey(timePartitionSlot));
}
}
}
public static TDataPartitionTableResp getDataPartitionWithRetry(
String database,
int seriesSlotStart,
int seriesSlotEnd,
long timeSlotStart,
long timeSlotEnd,
long timePartitionInterval)
throws InterruptedException {
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
database,
seriesSlotStart,
seriesSlotEnd,
timeSlotStart,
timeSlotEnd,
timePartitionInterval);
TDataPartitionTableResp dataPartitionTableResp;
TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable
try (SyncConfigNodeIServiceClient configNodeClient =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp = configNodeClient.getDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null
&& dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return dataPartitionTableResp;
}
} catch (Exception e) {
// Retry sometimes in order to avoid request timeout
LOGGER.error(e.getMessage());
TimeUnit.SECONDS.sleep(1);
}
}
Assert.fail("Failed to create DataPartition");
return null;
}
public static TDataPartitionTableResp getOrCreateDataPartitionWithRetry(
String database,
int seriesSlotStart,
int seriesSlotEnd,
long timeSlotStart,
long timeSlotEnd,
long timePartitionInterval)
throws InterruptedException {
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
database,
seriesSlotStart,
seriesSlotEnd,
timeSlotStart,
timeSlotEnd,
timePartitionInterval);
TDataPartitionTableResp dataPartitionTableResp;
TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable
try (SyncConfigNodeIServiceClient configNodeClient =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null
&& dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
ConfigNodeTestUtils.checkDataPartitionTable(
database,
seriesSlotStart,
seriesSlotEnd,
timeSlotStart,
timeSlotEnd,
timePartitionInterval,
configNodeClient.getDataPartitionTable(dataPartitionReq).getDataPartitionTable());
return dataPartitionTableResp;
}
} catch (Exception e) {
// Retry sometimes in order to avoid request timeout
LOGGER.error(e.getMessage());
TimeUnit.SECONDS.sleep(1);
}
}
Assert.fail("Failed to create DataPartition");
return null;
}
public static Map<TConsensusGroupId, Integer> countDataPartition(
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
dataPartitionMap) {
Map<TConsensusGroupId, AtomicInteger> counter = new ConcurrentHashMap<>();
dataPartitionMap.forEach(
((seriesPartitionSlot, timePartitionSlotMap) ->
timePartitionSlotMap.forEach(
((timePartitionSlot, consensusGroupIds) ->
consensusGroupIds.forEach(
(consensusGroupId ->
counter
.computeIfAbsent(consensusGroupId, empty -> new AtomicInteger(0))
.incrementAndGet()))))));
return counter.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get()));
}
public static TConfigNodeLocation generateTConfigNodeLocation(
int nodeId, ConfigNodeWrapper configNodeWrapper) {
return new TConfigNodeLocation(
nodeId,
new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()),
new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getConsensusPort()));
}
/**
* Generate a ConfigNode register request with the given ConfigNodeLocation and default
* configurations
*
* @param clusterName The target cluster name
* @param configNodeWrapper The given ConfigNode
* @return TConfigNodeRegisterReq for the given ConfigNode
*/
public static TConfigNodeRegisterReq generateTConfigNodeRegisterReq(
String clusterName, ConfigNodeWrapper configNodeWrapper) {
return new TConfigNodeRegisterReq()
.setConfigNodeLocation(generateTConfigNodeLocation(-1, configNodeWrapper))
.setClusterParameters(generateClusterParameters().setClusterName(clusterName));
}
public static TClusterParameters generateClusterParameters() {
TClusterParameters clusterParameters = new TClusterParameters();
clusterParameters.setClusterName("defaultCluster");
clusterParameters.setConfigNodeConsensusProtocolClass(
"org.apache.iotdb.consensus.simple.SimpleConsensus");
clusterParameters.setDataRegionConsensusProtocolClass(
"org.apache.iotdb.consensus.simple.SimpleConsensus");
clusterParameters.setSchemaRegionConsensusProtocolClass(
"org.apache.iotdb.consensus.simple.SimpleConsensus");
clusterParameters.setSeriesPartitionSlotNum(1000);
clusterParameters.setSeriesPartitionExecutorClass(
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor");
clusterParameters.setDefaultTTL(Long.MAX_VALUE);
clusterParameters.setTimePartitionInterval(604800000);
clusterParameters.setDataReplicationFactor(1);
clusterParameters.setSchemaReplicationFactor(1);
clusterParameters.setDataRegionPerDataNode(5.0);
clusterParameters.setSchemaRegionPerDataNode(1.0);
clusterParameters.setDiskSpaceWarningThreshold(0.01);
clusterParameters.setReadConsistencyLevel("strong");
clusterParameters.setTimestampPrecision("ms");
clusterParameters.setSchemaEngineMode("Memory");
clusterParameters.setTagAttributeTotalSize(700);
clusterParameters.setDatabaseLimitThreshold(-1);
return clusterParameters;
}
public static TDataNodeLocation generateTDataNodeLocation(
int nodeId, DataNodeWrapper dataNodeWrapper) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
dataNodeLocation.setDataNodeId(nodeId);
dataNodeLocation.setClientRpcEndPoint(
new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getPort()));
dataNodeLocation.setInternalEndPoint(
new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getInternalPort()));
dataNodeLocation.setMPPDataExchangeEndPoint(
new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getMppDataExchangePort()));
dataNodeLocation.setDataRegionConsensusEndPoint(
new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getDataRegionConsensusPort()));
dataNodeLocation.setSchemaRegionConsensusEndPoint(
new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getSchemaRegionConsensusPort()));
return dataNodeLocation;
}
public static TDataNodeConfiguration generateTDataNodeConfiguration(
int nodeId, DataNodeWrapper dataNodeWrapper) {
TNodeResource dataNodeResource = new TNodeResource();
dataNodeResource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
dataNodeResource.setMaxMemory(Runtime.getRuntime().totalMemory());
return new TDataNodeConfiguration(
generateTDataNodeLocation(nodeId, dataNodeWrapper), dataNodeResource);
}
public static TDataNodeRegisterReq generateTDataNodeRegisterReq(
String clusterName, DataNodeWrapper dataNodeWrapper) {
return new TDataNodeRegisterReq(
clusterName, generateTDataNodeConfiguration(-1, dataNodeWrapper));
}
public static TDataNodeRestartReq generateTDataNodeRestartReq(
String clusterName, int nodeId, DataNodeWrapper dataNodeWrapper) {
return new TDataNodeRestartReq(
clusterName, generateTDataNodeConfiguration(nodeId, dataNodeWrapper));
}
}