blob: 471a9b5e446f079d0ca6b055991d6655dc78ac21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.ratis.util.FileUtils;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ConfigNodeRPCServiceProcessorTest {
ConfigNodeRPCServiceProcessor processor;
@BeforeClass
public static void beforeClass() throws StartupException, ConfigurationException, IOException {
final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
UDFExecutableManager.setupAndGetInstance(
configNodeConfig.getTemporaryLibDir(), configNodeConfig.getUdfLibDir());
UDFClassLoaderManager.setupAndGetInstance(configNodeConfig.getUdfLibDir());
UDFRegistrationService.setupAndGetInstance(configNodeConfig.getSystemUdfDir());
ConfigNodeStartupCheck.getInstance().startUpCheck();
}
@Before
public void before() throws IOException {
processor = new ConfigNodeRPCServiceProcessor(new ConfigManager());
processor.getConsensusManager().singleCopyMayWaitUntilLeaderReady();
}
@After
public void after() throws IOException {
processor.close();
FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
FileUtils.deleteFully(
new File(CommonDescriptor.getInstance().getConfig().getProcedureWalFolder()));
}
@AfterClass
public static void afterClass() throws IOException {
UDFExecutableManager.getInstance().stop();
UDFClassLoaderManager.getInstance().stop();
UDFRegistrationService.getInstance().stop();
FileUtils.deleteFully(new File(ConfigNodeConstant.DATA_DIR));
}
private void checkGlobalConfig(TGlobalConfig globalConfig) {
Assert.assertEquals(
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
globalConfig.getDataRegionConsensusProtocolClass());
Assert.assertEquals(
ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
globalConfig.getSchemaRegionConsensusProtocolClass());
Assert.assertEquals(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
globalConfig.getSeriesPartitionSlotNum());
Assert.assertEquals(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
globalConfig.getSeriesPartitionExecutorClass());
}
private void registerDataNodes() throws TException {
for (int i = 0; i < 3; i++) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
dataNodeLocation.setDataNodeId(-1);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
dataNodeConfiguration.setLocation(dataNodeLocation);
dataNodeConfiguration.setResource(new TNodeResource(8, 1024 * 1024));
TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeConfiguration);
TDataNodeRegisterResp resp = processor.registerDataNode(req);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(i, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
}
}
@Test
public void testSetAndQueryStorageGroup() throws IllegalPathException, TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
registerDataNodes();
// set StorageGroup0 by default values
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
status = processor.setStorageGroup(setReq0);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup1 by specific values
TSetStorageGroupReq setReq1 =
new TSetStorageGroupReq(
new TStorageGroupSchema(sg1)
.setTTL(1024L)
.setSchemaReplicationFactor(5)
.setDataReplicationFactor(5)
.setTimePartitionInterval(2048L));
status = processor.setStorageGroup(setReq1);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// test count all StorageGroups
TCountStorageGroupResp countResp =
processor.countMatchedStorageGroups(Arrays.asList("root", "**"));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
Assert.assertEquals(2, countResp.getCount());
// test count one StorageGroup
countResp = processor.countMatchedStorageGroups(Arrays.asList("root", "sg0"));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
Assert.assertEquals(1, countResp.getCount());
// test query all StorageGroupSchemas
TStorageGroupSchemaResp getResp =
processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "**"));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
Map<String, TStorageGroupSchema> schemaMap = getResp.getStorageGroupSchemaMap();
Assert.assertEquals(2, schemaMap.size());
TStorageGroupSchema storageGroupSchema = schemaMap.get(sg0);
Assert.assertNotNull(storageGroupSchema);
Assert.assertEquals(sg0, storageGroupSchema.getName());
Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
Assert.assertEquals(86400000, storageGroupSchema.getTimePartitionInterval());
storageGroupSchema = schemaMap.get(sg1);
Assert.assertNotNull(storageGroupSchema);
Assert.assertEquals(sg1, storageGroupSchema.getName());
Assert.assertEquals(1024L, storageGroupSchema.getTTL());
Assert.assertEquals(5, storageGroupSchema.getSchemaReplicationFactor());
Assert.assertEquals(5, storageGroupSchema.getDataReplicationFactor());
Assert.assertEquals(2048L, storageGroupSchema.getTimePartitionInterval());
// test fail by re-register
status = processor.setStorageGroup(setReq0);
Assert.assertEquals(
TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
// test StorageGroup setter interfaces
PartialPath patternPath = new PartialPath(sg1);
status =
processor.setTTL(new TSetTTLReq(Arrays.asList(patternPath.getNodes()), Long.MAX_VALUE));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
status = processor.setSchemaReplicationFactor(new TSetSchemaReplicationFactorReq(sg1, 1));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
status = processor.setDataReplicationFactor(new TSetDataReplicationFactorReq(sg1, 1));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
status = processor.setTimePartitionInterval(new TSetTimePartitionIntervalReq(sg1, 604800L));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// test setter results
getResp = processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "sg1"));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
schemaMap = getResp.getStorageGroupSchemaMap();
Assert.assertEquals(1, schemaMap.size());
storageGroupSchema = schemaMap.get(sg1);
Assert.assertNotNull(storageGroupSchema);
Assert.assertEquals(sg1, storageGroupSchema.getName());
Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
}
/** Generate a PatternTree and serialize it into a ByteBuffer */
private ByteBuffer generatePatternTreeBuffer(String[] paths)
throws IllegalPathException, IOException {
PathPatternTree patternTree = new PathPatternTree();
for (String path : paths) {
patternTree.appendPathPattern(new PartialPath(path));
}
patternTree.constructTree();
PublicBAOS baos = new PublicBAOS();
patternTree.serialize(baos);
return ByteBuffer.wrap(baos.toByteArray());
}
@Test
public void testDeleteStorageGroup() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
registerDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
// set StorageGroup0 by default values
status = processor.setStorageGroup(setReq0);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup1 by specific values
TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
status = processor.setStorageGroup(setReq1);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
List<String> sgs = Arrays.asList(sg0, sg1);
deleteStorageGroupsReq.setPrefixPathList(sgs);
TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
TStorageGroupSchemaResp root =
processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
}
@Test
public void testDeleteStorageGroupInvalidateCacheFailed() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
registerDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
ConfigNodeProcedureEnv.setInvalidCacheResult(false);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
// set StorageGroup0 by default values
status = processor.setStorageGroup(setReq0);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup1 by specific values
TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
status = processor.setStorageGroup(setReq1);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
List<String> sgs = Arrays.asList(sg0, sg1);
deleteStorageGroupsReq.setPrefixPathList(sgs);
TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
TStorageGroupSchemaResp root =
processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
// rollback success
Assert.assertEquals(root.getStorageGroupSchemaMap().size(), 2);
Assert.assertEquals(TSStatusCode.MULTIPLE_ERROR.getStatusCode(), deleteSgStatus.getCode());
}
@Test
public void testGetSchemaNodeManagementPartition()
throws TException, IllegalPathException, IOException {
final String sg = "root.sg";
final int storageGroupNum = 2;
TSStatus status;
TSchemaNodeManagementReq nodeManagementReq;
TSchemaNodeManagementResp nodeManagementResp;
// register DataNodes
registerDataNodes();
// set StorageGroups
for (int i = 0; i < storageGroupNum; i++) {
TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
status = processor.setStorageGroup(setReq);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}
ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
nodeManagementReq.setLevel(-1);
nodeManagementResp = processor.getSchemaNodeManagementPartition(nodeManagementReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
}
}