AINode: Correcting the Omissions and Redundancies (#13594)
* Add logic of plan deserialization in configNode
* remove AINodeService in datanode which is not used anymore.
(cherry picked from commit ee9b752bacc6ac54ffe366d485579dab12aef6ed)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 187d258..357fd23 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -20,7 +20,13 @@
package org.apache.iotdb.confignode.consensus.request;
import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
+import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
@@ -43,6 +49,10 @@
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan;
+import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
+import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
@@ -162,6 +172,18 @@
case RemoveDataNode:
plan = new RemoveDataNodePlan();
break;
+ case RegisterAINode:
+ plan = new RegisterAINodePlan();
+ break;
+ case RemoveAINode:
+ plan = new RemoveAINodePlan();
+ break;
+ case GetAINodeConfiguration:
+ plan = new GetAINodeConfigurationPlan();
+ break;
+ case UpdateAINodeConfiguration:
+ plan = new UpdateAINodePlan();
+ break;
case CreateDatabase:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase);
break;
@@ -400,6 +422,24 @@
case UPDATE_CQ_LAST_EXEC_TIME:
plan = new UpdateCQLastExecTimePlan();
break;
+ case CreateModel:
+ plan = new CreateModelPlan();
+ break;
+ case UpdateModelInfo:
+ plan = new UpdateModelInfoPlan();
+ break;
+ case DropModel:
+ plan = new DropModelPlan();
+ break;
+ case ShowModel:
+ plan = new ShowModelPlan();
+ break;
+ case DropModelInNode:
+ plan = new DropModelInNodePlan();
+ break;
+ case GetModelInfo:
+ plan = new GetModelInfoPlan();
+ break;
case CreatePipePlugin:
plan = new CreatePipePluginPlan();
break;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java
index b080cf7..7222a8f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java
@@ -22,10 +22,18 @@
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class GetAINodeConfigurationPlan extends ConfigPhysicalReadPlan {
// if aiNodeId is set to -1, return all AINode configurations.
- private final int aiNodeId;
+ private int aiNodeId;
+
+ public GetAINodeConfigurationPlan() {
+ super(ConfigPhysicalPlanType.GetAINodeConfiguration);
+ }
public GetAINodeConfigurationPlan(final int aiNodeId) {
super(ConfigPhysicalPlanType.GetAINodeConfiguration);
@@ -37,6 +45,17 @@
}
@Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ stream.writeInt(aiNodeId);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.aiNodeId = buffer.getInt();
+ }
+
+ @Override
public boolean equals(final Object o) {
if (this == o) {
return true;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java
index dfec065..9c33c26 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java
@@ -23,11 +23,20 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class GetModelInfoPlan extends ConfigPhysicalReadPlan {
- private final String modelId;
+ private String modelId;
+
+ public GetModelInfoPlan() {
+ super(ConfigPhysicalPlanType.GetModelInfo);
+ }
public GetModelInfoPlan(final TGetModelInfoReq getModelInfoReq) {
super(ConfigPhysicalPlanType.GetModelInfo);
@@ -39,6 +48,17 @@
}
@Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ ReadWriteIOUtils.write(modelId, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.modelId = ReadWriteIOUtils.readString(buffer);
+ }
+
+ @Override
public boolean equals(final Object o) {
if (this == o) {
return true;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
index c3d0ff7..df924c9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
@@ -23,12 +23,21 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class ShowModelPlan extends ConfigPhysicalReadPlan {
private String modelName;
+ public ShowModelPlan() {
+ super(ConfigPhysicalPlanType.ShowModel);
+ }
+
public ShowModelPlan(final TShowModelReq showModelReq) {
super(ConfigPhysicalPlanType.ShowModel);
if (showModelReq.isSetModelId()) {
@@ -45,6 +54,21 @@
}
@Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ ReadWriteIOUtils.write(modelName != null, stream);
+ ReadWriteIOUtils.write(modelName, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ boolean isSetModelId = ReadWriteIOUtils.readBool(buffer);
+ if (isSetModelId) {
+ this.modelName = ReadWriteIOUtils.readString(buffer);
+ }
+ }
+
+ @Override
public boolean equals(final Object o) {
if (this == o) {
return true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 07efa4d..bcc3cbe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -439,8 +439,8 @@
/** Compact the unsequence files into the overlapped sequence files */
private volatile boolean enableCrossSpaceCompaction = true;
- /** Enable the service for AINode */
- private boolean enableAINodeService = false;
+ /** Enable auto repair compaction */
+ private volatile boolean enableAutoRepairCompaction = true;
/** The buffer for sort operation */
private long sortBufferSize = 1024 * 1024L;
@@ -920,9 +920,6 @@
/** Internal port for coordinator */
private int internalPort = 10730;
- /** Port for AINode */
- private int aiNodePort = 10780;
-
/** Internal port for dataRegion consensus protocol */
private int dataRegionConsensusPort = 10760;
@@ -2869,12 +2866,12 @@
this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
}
- public boolean isEnableAINodeService() {
- return enableAINodeService;
+ public boolean isEnableAutoRepairCompaction() {
+ return enableAutoRepairCompaction;
}
- public void setEnableAINodeService(boolean enableAINodeService) {
- this.enableAINodeService = enableAINodeService;
+ public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) {
+ this.enableAutoRepairCompaction = enableAutoRepairCompaction;
}
public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
@@ -3155,14 +3152,6 @@
this.internalPort = internalPort;
}
- public int getAINodePort() {
- return aiNodePort;
- }
-
- public void setAINodePort(int aiNodePort) {
- this.aiNodePort = aiNodePort;
- }
-
public int getDataRegionConsensusPort() {
return dataRegionConsensusPort;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 442cdbb..28b766d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -287,19 +287,6 @@
.getProperty(IoTDBConstant.DN_RPC_PORT, Integer.toString(conf.getRpcPort()))
.trim()));
- conf.setEnableAINodeService(
- Boolean.parseBoolean(
- properties
- .getProperty(
- "enable_ainode_rpc_service", Boolean.toString(conf.isEnableAINodeService()))
- .trim()));
-
- conf.setAINodePort(
- Integer.parseInt(
- properties
- .getProperty("ainode_rpc_port", Integer.toString(conf.getAINodePort()))
- .trim()));
-
conf.setBufferedArraysMemoryProportion(
Double.parseDouble(
properties
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java
deleted file mode 100644
index c5969f8..0000000
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.thrift.handler;
-
-import org.apache.iotdb.db.protocol.thrift.impl.IAINodeRPCServiceWithHandler;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AINodeRPCServiceThriftHandler implements TServerEventHandler {
-
- private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
- private final IAINodeRPCServiceWithHandler eventHandler;
-
- public AINodeRPCServiceThriftHandler(IAINodeRPCServiceWithHandler eventHandler) {
- this.eventHandler = eventHandler;
- }
-
- @Override
- public ServerContext createContext(TProtocol in, TProtocol out) {
- thriftConnectionNumber.incrementAndGet();
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) {
- thriftConnectionNumber.decrementAndGet();
- eventHandler.handleExit();
- }
-
- @Override
- public void preServe() {
- // do nothing
- }
-
- @Override
- public void processContext(
- ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
- // do nothing
- }
-}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java
deleted file mode 100644
index 68e492c..0000000
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.protocol.thrift.impl;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
-import org.apache.iotdb.db.protocol.session.IClientSession;
-import org.apache.iotdb.db.protocol.session.InternalClientSession;
-import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.protocol.thrift.OperationType;
-import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
-import org.apache.iotdb.db.utils.QueryDataSetUtils;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-import org.apache.tsfile.utils.Pair;
-
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.List;
-
-public class AINodeRPCServiceImpl implements IAINodeRPCServiceWithHandler {
-
- public static final String AI_METRICS_PATH_PREFIX = "root.__system.AI.exp";
-
- private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
- private static final Coordinator COORDINATOR = Coordinator.getInstance();
-
- private final IPartitionFetcher partitionFetcher;
-
- private final ISchemaFetcher schemaFetcher;
-
- private final IClientSession session;
-
- public AINodeRPCServiceImpl() {
- super();
- partitionFetcher = ClusterPartitionFetcher.getInstance();
- schemaFetcher = ClusterSchemaFetcher.getInstance();
- session = new InternalClientSession("AINodeService");
- SESSION_MANAGER.registerSession(session);
- SESSION_MANAGER.supplySession(session, "AINode", ZoneId.systemDefault(), ClientVersion.V_1_0);
- }
-
- @Override
- public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws TException {
- boolean finished = false;
- TFetchTimeseriesResp resp = new TFetchTimeseriesResp();
- Throwable t = null;
- try {
-
- Statement s = StatementGenerator.createStatement(req, session.getZoneId());
-
- if (s == null) {
- resp.setStatus(
- RpcUtils.getStatus(
- TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
- return resp;
- }
-
- long queryId =
- SESSION_MANAGER.requestQueryId(session, SESSION_MANAGER.requestStatementId(session));
- ExecutionResult result =
- COORDINATOR.executeForTreeModel(
- s,
- queryId,
- SESSION_MANAGER.getSessionInfo(session),
- "",
- partitionFetcher,
- schemaFetcher,
- req.getTimeout());
-
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- resp.setStatus(result.status);
- return resp;
- }
-
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
- try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
-
- DatasetHeader header = queryExecution.getDatasetHeader();
- resp.setStatus(result.status);
- resp.setColumnNameList(header.getRespColumns());
- resp.setColumnTypeList(header.getRespDataTypeList());
- resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
- resp.setQueryId(queryId);
-
- Pair<List<ByteBuffer>, Boolean> pair =
- QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
- resp.setTsDataset(pair.left);
- finished = pair.right;
- resp.setHasMoreData(!finished);
- return resp;
- }
- } catch (Exception e) {
- finished = true;
- t = e;
- resp.setStatus(ErrorHandlingUtils.onQueryException(e, OperationType.EXECUTE_STATEMENT));
- return resp;
- } catch (Error error) {
- t = error;
- throw error;
- } finally {
- if (finished) {
- COORDINATOR.cleanupQueryExecution(resp.queryId, req, t);
- }
- }
- }
-
- @Override
- public TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) throws TException {
- TFetchMoreDataResp resp = new TFetchMoreDataResp();
- boolean finished = false;
- Throwable t = null;
- try {
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
- resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
-
- if (queryExecution == null) {
- resp.setHasMoreData(false);
- return resp;
- }
-
- try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
- Pair<List<ByteBuffer>, Boolean> pair =
- QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
- List<ByteBuffer> result = pair.left;
- finished = pair.right;
- resp.setTsDataset(result);
- resp.setHasMoreData(!finished);
- return resp;
- }
- } catch (Exception e) {
- finished = true;
- t = e;
- resp.setStatus(ErrorHandlingUtils.onQueryException(e, OperationType.FETCH_RESULTS));
- return resp;
- } catch (Error error) {
- t = error;
- throw error;
- } finally {
- if (finished) {
- COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
- }
- }
- }
-
- @Override
- public void handleExit() {
- SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
- }
-}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java
deleted file mode 100644
index 7d9df50..0000000
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.protocol.thrift.impl;
-
-import org.apache.iotdb.mpp.rpc.thrift.IAINodeInternalRPCService;
-
-public interface IAINodeRPCServiceWithHandler extends IAINodeInternalRPCService.Iface {
- void handleExit();
-}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java
deleted file mode 100644
index 5ec49e7..0000000
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.service;
-
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
-import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.protocol.thrift.handler.AINodeRPCServiceThriftHandler;
-import org.apache.iotdb.db.protocol.thrift.impl.AINodeRPCServiceImpl;
-import org.apache.iotdb.mpp.rpc.thrift.IAINodeInternalRPCService;
-import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
-
-public class AINodeRPCService extends ThriftService implements AINodeRPCServiceMBean {
-
- private AINodeRPCServiceImpl impl;
-
- private AINodeRPCService() {}
-
- @Override
- public ServiceType getID() {
- return ServiceType.AINode_RPC_SERVICE;
- }
-
- @Override
- public void initTProcessor() {
- impl = new AINodeRPCServiceImpl();
- initSyncedServiceImpl(null);
- processor = new IAINodeInternalRPCService.Processor<>(impl);
- }
-
- @Override
- public void initThriftServiceThread()
- throws IllegalAccessException, InstantiationException, ClassNotFoundException {
- try {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- thriftServiceThread =
- new ThriftServiceThread(
- processor,
- getID().getName(),
- ThreadName.AINODE_RPC_SERVICE.getName(),
- getBindIP(),
- getBindPort(),
- config.getRpcMaxConcurrentClientNum(),
- config.getThriftServerAwaitTimeForStopService(),
- new AINodeRPCServiceThriftHandler(impl),
- config.isRpcThriftCompressionEnable(),
- DeepCopyRpcTransportFactory.INSTANCE);
- } catch (RPCServiceException e) {
- throw new IllegalAccessException(e.getMessage());
- }
- thriftServiceThread.setName(ThreadName.AINODE_RPC_SERVICE.getName());
- }
-
- @Override
- public String getBindIP() {
- return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
- }
-
- @Override
- public int getBindPort() {
- return IoTDBDescriptor.getInstance().getConfig().getAINodePort();
- }
-
- private static class AINodeRPCServiceHolder {
- private static final AINodeRPCService INSTANCE = new AINodeRPCService();
-
- private AINodeRPCServiceHolder() {}
- }
-
- public static AINodeRPCService getInstance() {
- return AINodeRPCServiceHolder.INSTANCE;
- }
-}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java
deleted file mode 100644
index f4f51c0..0000000
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.service;
-
-public interface AINodeRPCServiceMBean {}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7c83e68..c8f0b98 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -814,10 +814,6 @@
private void setUpRPCService() throws StartupException {
// Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
registerManager.register(DataNodeInternalRPCService.getInstance());
- // Start InternalRPCService to indicate that the current DataNode can accept request from AINode
- if (config.isEnableAINodeService()) {
- registerManager.register(AINodeRPCService.getInstance());
- }
// Notice: During the period between starting the internal RPC service
// and starting the client RPC service , some requests may fail because
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 76545f5..8c6c884 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -1047,17 +1047,4 @@
/** Empty rpc, only for connection test */
common.TSStatus testConnectionEmptyRPC()
-}
-
-service IAINodeInternalRPCService{
- /**
- * Fecth the data of the specified time series
- */
- TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req)
-
- /**
- * Fetch rest data for a specified fetchTimeseries
- */
- TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req)
-
-}
+}
\ No newline at end of file