blob: 164f4b39a9e69370530152f80256498062ff177f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.rpc.impl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.cli.AddLearnersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.AddPeerRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.ChangePeersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.GetLeaderRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.GetPeersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.RemoveLearnersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.RemovePeerRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.ResetLearnersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.ResetPeerRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.SnapshotRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.TransferLeaderRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.GetFileRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.ReadIndexRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.RequestVoteRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
/**
* TODO https://issues.apache.org/jira/browse/IGNITE-14519 Unsubscribe on shutdown
*/
public class IgniteRpcServer implements RpcServer<Void> {
private final ClusterService service;
private final NodeManager nodeManager;
private final Executor rpcExecutor;
private final List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
private final Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
/**
* @param service The cluster service.
* @param nodeManager The node manager.
* @param raftMessagesFactory Message factory.
* @param rpcExecutor The executor for RPC requests.
*/
public IgniteRpcServer(
ClusterService service,
NodeManager nodeManager,
RaftMessagesFactory raftMessagesFactory,
Executor rpcExecutor
) {
this.service = service;
this.nodeManager = nodeManager;
this.rpcExecutor = rpcExecutor;
// raft server RPC
AppendEntriesRequestProcessor appendEntriesRequestProcessor =
new AppendEntriesRequestProcessor(rpcExecutor, raftMessagesFactory);
registerConnectionClosedEventListener(appendEntriesRequestProcessor);
registerProcessor(appendEntriesRequestProcessor);
registerProcessor(new GetFileRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new InstallSnapshotRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new RequestVoteRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory)); // TODO asch this should go last.
registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, raftMessagesFactory));
// raft native cli service
registerProcessor(new AddPeerRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new RemovePeerRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ResetPeerRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ChangePeersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new GetLeaderRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new SnapshotRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new TransferLeaderRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new GetPeersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new AddLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
// common client integration
registerProcessor(new ActionRequestProcessor(rpcExecutor, raftMessagesFactory));
var messageHandler = new RpcMessageHandler();
service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
service.topologyService().addEventHandler(new TopologyEventHandler() {
@Override public void onAppeared(ClusterNode member) {
// TODO asch optimize start replicator https://issues.apache.org/jira/browse/IGNITE-14843
}
@Override public void onDisappeared(ClusterNode member) {
for (ConnectionClosedEventListener listener : listeners)
listener.onClosed(service.topologyService().localMember().name(), member.name());
}
});
}
/**
* Implementation of a message handler that dispatches the incoming requests to a suitable {@link RpcProcessor}.
*/
public class RpcMessageHandler implements NetworkMessageHandler {
/** {@inheritDoc} */
@Override public void onReceived(NetworkMessage message, NetworkAddress senderAddr, String correlationId) {
Class<? extends NetworkMessage> cls = message.getClass();
RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
// TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
if (prc == null) {
for (Class<?> iface : cls.getInterfaces()) {
prc = processors.get(iface.getName());
if (prc != null)
break;
}
}
if (prc == null)
return;
RpcProcessor.ExecutorSelector selector = prc.executorSelector();
Executor executor = null;
if (selector != null)
executor = selector.select(prc.getClass().getName(), message, nodeManager);
if (executor == null)
executor = prc.executor();
if (executor == null)
executor = rpcExecutor;
RpcProcessor<NetworkMessage> finalPrc = prc;
executor.execute(() -> {
var context = new RpcContext() {
@Override public NodeManager getNodeManager() {
return nodeManager;
}
@Override public void sendResponse(Object responseObj) {
service.messagingService().send(senderAddr, (NetworkMessage) responseObj, correlationId);
}
@Override public NetworkAddress getRemoteAddress() {
return senderAddr;
}
@Override public NetworkAddress getLocalAddress() {
return service.topologyService().localMember().address();
}
};
finalPrc.handleRequest(context, message);
});
}
}
/** {@inheritDoc} */
@Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
if (!listeners.contains(listener))
listeners.add(listener);
}
/** {@inheritDoc} */
@Override public void registerProcessor(RpcProcessor<?> processor) {
processors.put(processor.interest(), processor);
}
/** {@inheritDoc} */
@Override public int boundPort() {
return 0;
}
/** {@inheritDoc} */
@Override public boolean init(Void opts) {
return true;
}
public ClusterService clusterService() {
return service;
}
/** {@inheritDoc} */
@Override public void shutdown() {
}
}