IGNITE-15288 Full-featured RaftGroupService - Fixes #321.
Signed-off-by: Alexey Scherbakov <alexey.scherbakoff@gmail.com>
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 63f84f7..6d78dbc 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -85,7 +85,7 @@
public static Ignite startClient(String... addrs) {
if (addrs == null || addrs.length == 0)
- addrs = new String[]{"127.0.0.2:" + serverPort};
+ addrs = new String[]{"127.0.0.1:" + serverPort};
var builder = IgniteClient.builder().addresses(addrs);
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 8a2f3d2..98e2237 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -49,9 +49,9 @@
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
@@ -97,7 +97,7 @@
private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
/** Factory. */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Network factory. */
private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
index 451d3a4..b92ff9e 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
@@ -25,6 +25,7 @@
/**
* A participant of a replication group.
*/
+// TODO: IGNITE-15506 Replace it by jraft Peer
public final class Peer implements Serializable {
/**
* Network address.
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
deleted file mode 100644
index 430ce84..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client;
-
-/**
- * Public error codes for raft protocol.
- */
-public enum RaftErrorCode {
- /** */
- NO_LEADER(1001, "No leader is elected"),
-
- /** */
- LEADER_CHANGED(1002, "Stale leader"),
-
- /** */
- ILLEGAL_STATE(1003, "Internal server error"),
-
- /** */
- BUSY(1004, "A peer is overloaded, retry later"),
-
- /** */
- SNAPSHOT(1005, "Snapshot error"),
-
- /** */
- STATE_MACHINE(1006, "Unrecoverable state machine error");
-
- /** */
- private final int code;
-
- /** */
- private final String desc;
-
- /**
- * @param code The code.
- * @param desc The desctiption.
- */
- RaftErrorCode(int code, String desc) {
- this.code = code;
- this.desc = desc;
- }
-
- /**
- * @return The code.
- */
- public int code() {
- return code;
- }
-
- /**
- * @return The description.
- */
- public String description() {
- return desc;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/exception/RaftException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/exception/RaftException.java
deleted file mode 100644
index 4ad4b1f..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/exception/RaftException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.exception;
-
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.raft.client.RaftErrorCode;
-
-/**
- * A raft exception containing code and description.
- */
-public class RaftException extends IgniteInternalException {
- private final RaftErrorCode code;
-
- /**
- * @param errCode Error code.
- */
- public RaftException(RaftErrorCode errCode) {
- this.code = errCode;
- }
-
- /**
- * @param errCode Error code.
- * @param message Error message.
- */
- public RaftException(RaftErrorCode errCode, String message) {
- super(message);
-
- this.code = errCode;
- }
-
- /**
- * @return Error code.
- */
- public RaftErrorCode errorCode() {
- return code;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
deleted file mode 100644
index 936bda4..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Add learners.
- */
-@Transferable(value = RaftClientMessageGroup.ADD_LEARNERS_REQUEST, autoSerializable = false)
-public interface AddLearnersRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return List of learners.
- */
- List<Peer> learners();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
deleted file mode 100644
index 91da79e..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Add peers.
- */
-@Transferable(value = RaftClientMessageGroup.ADD_PEERS_REQUEST, autoSerializable = false)
-public interface AddPeersRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return Peers.
- */
- List<Peer> peers();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
deleted file mode 100644
index 79858d4..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Change peers result.
- */
-@Transferable(value = RaftClientMessageGroup.CHANGE_PEERS_RESPONSE, autoSerializable = false)
-public interface ChangePeersResponse extends NetworkMessage, Serializable {
- /**
- * @return Old peers.
- */
- List<Peer> oldPeers();
-
- /**
- * @return New peers.
- */
- List<Peer> newPeers();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
deleted file mode 100644
index 35aa99d..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Get leader.
- */
-@Transferable(value = RaftClientMessageGroup.GET_LEADER_REQUEST, autoSerializable = false)
-public interface GetLeaderRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
deleted file mode 100644
index e8d8349..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * A current leader.
- */
-@Transferable(value = RaftClientMessageGroup.GET_LEADER_RESPONSE, autoSerializable = false)
-public interface GetLeaderResponse extends NetworkMessage, Serializable {
- /**
- * @return The leader.
- */
- Peer leader();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
deleted file mode 100644
index 3fd5db1..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/** Get peers. */
-@Transferable(value = RaftClientMessageGroup.GET_PEERS_REQUEST, autoSerializable = false)
-public interface GetPeersRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return {@code True} to list only alive nodes.
- */
- boolean onlyAlive();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
deleted file mode 100644
index 28b0a0f..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- *
- */
-@Transferable(value = RaftClientMessageGroup.GET_PEERS_RESPONSE, autoSerializable = false)
-public interface GetPeersResponse extends NetworkMessage, Serializable {
- /**
- * @return Current peers.
- */
- List<Peer> peers();
-
- /**
- * @return Current leaners.
- */
- List<Peer> learners();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageGroup.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageGroup.java
deleted file mode 100644
index aa30eb5..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageGroup.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.network.annotations.MessageGroup;
-
-/**
- * Message types for the raft-client module.
- */
-@MessageGroup(groupType = 2, groupName = "RaftClientMessages")
-public class RaftClientMessageGroup {
- /**
- * Message type for {@link ActionRequest}.
- */
- public static final short ACTION_REQUEST = 0;
-
- /**
- * Message type for {@link ActionResponse}.
- */
- public static final short ACTION_RESPONSE = 1;
-
- /**
- * Message type for {@link AddLearnersRequest}.
- */
- public static final short ADD_LEARNERS_REQUEST = 2;
-
- /**
- * Message type for {@link AddPeersRequest}.
- */
- public static final short ADD_PEERS_REQUEST = 3;
-
- /**
- * Message type for {@link ChangePeersResponse}.
- */
- public static final short CHANGE_PEERS_RESPONSE = 4;
-
- /**
- * Message type for {@link GetLeaderRequest}.
- */
- public static final short GET_LEADER_REQUEST = 5;
-
- /**
- * Message type for {@link GetLeaderResponse}.
- */
- public static final short GET_LEADER_RESPONSE = 6;
-
- /**
- * Message type for {@link GetPeersRequest}.
- */
- public static final short GET_PEERS_REQUEST = 7;
-
- /**
- * Message type for {@link GetPeersResponse}.
- */
- public static final short GET_PEERS_RESPONSE = 8;
-
- /**
- * Message type for {@link RaftErrorResponse}.
- */
- public static final short RAFT_ERROR_RESPONSE = 9;
-
- /**
- * Message type for {@link RemoveLearnersRequest}.
- */
- public static final short REMOVE_LEARNERS_REQUEST = 10;
-
- /**
- * Message type for {@link RemovePeersRequest}.
- */
- public static final short REMOVE_PEERS_REQUEST = 11;
-
- /**
- * Message type for {@link SnapshotRequest}.
- */
- public static final short SNAPSHOT_REQUEST = 12;
-
- /**
- * Message type for {@link TransferLeadershipRequest}.
- */
- public static final short TRANSFER_LEADERSHIP_REQUEST = 13;
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
deleted file mode 100644
index 96bc16b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.RaftErrorCode;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Raft error response.
- */
-@Transferable(value = RaftClientMessageGroup.RAFT_ERROR_RESPONSE, autoSerializable = false)
-public interface RaftErrorResponse extends NetworkMessage, Serializable {
- /**
- * @return Error code.
- */
- public RaftErrorCode errorCode();
-
- /**
- * @return Error message.
- */
- public String errorMessage();
-
- /**
- * @return The new leader if a current leader is obsolete or null if not applicable.
- */
- public @Nullable Peer newLeader();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
deleted file mode 100644
index eb549a3..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Remove learners.
- */
-@Transferable(value = RaftClientMessageGroup.REMOVE_LEARNERS_REQUEST, autoSerializable = false)
-public interface RemoveLearnersRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return Learners to remove.
- */
- List<Peer> learners();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
deleted file mode 100644
index 777e0b0..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Remove peers.
- */
-@Transferable(value = RaftClientMessageGroup.REMOVE_PEERS_REQUEST, autoSerializable = false)
-public interface RemovePeersRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return Peers to remove.
- */
- List<Peer> peers();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
deleted file mode 100644
index 9e34f4c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Take a local snapshot on the peer.
- */
-@Transferable(value = RaftClientMessageGroup.SNAPSHOT_REQUEST, autoSerializable = false)
-public interface SnapshotRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
deleted file mode 100644
index c123757..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.message;
-
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.client.Peer;
-
-/**
- * Transfer a leadership to receiving peer.
- */
-@Transferable(value = RaftClientMessageGroup.TRANSFER_LEADERSHIP_REQUEST, autoSerializable = false)
-public interface TransferLeadershipRequest extends NetworkMessage, Serializable {
- /**
- * @return Group id.
- */
- String groupId();
-
- /**
- * @return New leader.
- */
- Peer newLeader();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
index 778b8aa..aab2178 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
@@ -104,20 +104,33 @@
CompletableFuture<Void> refreshMembers(boolean onlyAlive);
/**
- * Adds a voting peers to the replication group.
+ * Adds a voting peer to the replication group.
* <p>
* After the future completion methods like {@link #peers()} and {@link #learners()}
* can be used to retrieve current members of a group.
* <p>
* This operation is executed on a group leader.
*
- * @param peers Peers.
+ * @param peer Peer
* @return A future.
*/
- CompletableFuture<Void> addPeers(List<Peer> peers);
+ CompletableFuture<Void> addPeer(Peer peer);
/**
- * Removes peers from the replication group.
+ * Removes peer from the replication group.
+ * <p>
+ * After the future completion methods like {@link #peers()} and {@link #learners()}
+ * can be used to retrieve current members of a group.
+ * <p>
+ * This operation is executed on a group leader.
+ *
+ * @param peer Peer.
+ * @return A future.
+ */
+ CompletableFuture<Void> removePeer(Peer peer);
+
+ /**
+ * Changes peers of the replication group.
* <p>
* After the future completion methods like {@link #peers()} and {@link #learners()}
* can be used to retrieve current members of a group.
@@ -127,7 +140,7 @@
* @param peers Peers.
* @return A future.
*/
- CompletableFuture<Void> removePeers(List<Peer> peers);
+ CompletableFuture<Void> changePeers(List<Peer> peers);
/**
* Adds learners (non-voting members).
@@ -156,6 +169,19 @@
CompletableFuture<Void> removeLearners(List<Peer> learners);
/**
+ * Set learners of the raft group to needed list of learners.
+ * <p>
+ * After the future completion methods like {@link #peers()} and {@link #learners()}
+ * can be used to retrieve current members of a group.
+ * <p>
+ * This operation is executed on a group leader.
+ *
+ * @param learners List of learners.
+ * @return A future.
+ */
+ CompletableFuture<Void> resetLearners(List<Peer> learners);
+
+ /**
* Takes a state machine snapshot on a given group peer.
*
* @param peer Peer.
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
deleted file mode 100644
index 246211a..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.service.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.raft.client.Command;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.apache.ignite.raft.client.exception.RaftException;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.ActionResponse;
-import org.apache.ignite.raft.client.message.AddLearnersRequest;
-import org.apache.ignite.raft.client.message.AddPeersRequest;
-import org.apache.ignite.raft.client.message.ChangePeersResponse;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.GetPeersRequest;
-import org.apache.ignite.raft.client.message.GetPeersResponse;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
-import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
-import org.apache.ignite.raft.client.message.RemovePeersRequest;
-import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.jetbrains.annotations.NotNull;
-
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.client.RaftErrorCode.LEADER_CHANGED;
-import static org.apache.ignite.raft.client.RaftErrorCode.NO_LEADER;
-
-/**
- * The implementation of {@link RaftGroupService}
- */
-public class RaftGroupServiceImpl implements RaftGroupService {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceImpl.class);
-
- /** */
- private volatile long timeout;
-
- /** */
- private final String groupId;
-
- /** */
- private final RaftClientMessagesFactory factory;
-
- /** */
- private volatile Peer leader;
-
- /** */
- private volatile List<Peer> peers;
-
- /** */
- private volatile List<Peer> learners;
-
- /** */
- private final ClusterService cluster;
-
- /** */
- private final long retryDelay;
-
- /** */
- private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-
- /**
- * Constructor.
- *
- * @param groupId Group id.
- * @param cluster A cluster.
- * @param factory A message factory.
- * @param timeout Request timeout.
- * @param peers Initial group configuration.
- * @param leader Group leader.
- * @param retryDelay Retry delay.
- */
- private RaftGroupServiceImpl(
- String groupId,
- ClusterService cluster,
- RaftClientMessagesFactory factory,
- int timeout,
- List<Peer> peers,
- Peer leader,
- long retryDelay
- ) {
- this.cluster = requireNonNull(cluster);
- this.peers = requireNonNull(peers);
- this.factory = factory;
- this.timeout = timeout;
- this.groupId = groupId;
- this.retryDelay = retryDelay;
- this.leader = leader;
- }
-
- /**
- * Starts raft group service.
- *
- * @param groupId Raft group id.
- * @param cluster Cluster service.
- * @param factory Message factory.
- * @param timeout Timeout.
- * @param peers List of all peers.
- * @param getLeader {@code True} to get the group's leader upon service creation.
- * @param retryDelay Retry delay.
- * @return Future representing pending completion of the operation.
- */
- public static CompletableFuture<RaftGroupService> start(
- String groupId,
- ClusterService cluster,
- RaftClientMessagesFactory factory,
- int timeout,
- List<Peer> peers,
- boolean getLeader,
- long retryDelay
- ) {
- var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, peers, null, retryDelay);
-
- if (!getLeader) {
- return CompletableFuture.completedFuture(service);
- }
-
- return service.refreshLeader().handle((unused, throwable) -> {
- if (throwable != null)
- LOG.error("Failed to refresh a leader", throwable);
-
- return service;
- });
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull String groupId() {
- return groupId;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout() {
- return timeout;
- }
-
- /** {@inheritDoc} */
- @Override public void timeout(long newTimeout) {
- this.timeout = newTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public Peer leader() {
- return leader;
- }
-
- /** {@inheritDoc} */
- @Override public List<Peer> peers() {
- return peers;
- }
-
- /** {@inheritDoc} */
- @Override public List<Peer> learners() {
- return learners;
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> refreshLeader() {
- GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
-
- CompletableFuture<GetLeaderResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(randomNode(), req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- leader = resp.leader();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
- GetPeersRequest req = factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
-
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> refreshMembers(onlyAlive));
-
- CompletableFuture<GetPeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- peers = resp.peers();
- learners = resp.learners();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> addPeers(List<Peer> peers) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> addPeers(peers));
-
- AddPeersRequest req = factory.addPeersRequest().groupId(groupId).peers(peers).build();
-
- CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.peers = resp.newPeers();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> removePeers(List<Peer> peers) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> removePeers(peers));
-
- RemovePeersRequest req = factory.removePeersRequest().groupId(groupId).peers(peers).build();
-
- CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.peers = resp.newPeers();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> addLearners(List<Peer> learners) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> addLearners(learners));
-
- AddLearnersRequest req = factory.addLearnersRequest().groupId(groupId).learners(learners).build();
-
- CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.learners = resp.newPeers();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> removeLearners(List<Peer> learners) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> removeLearners(learners));
-
- RemoveLearnersRequest req = factory.removeLearnersRequest().groupId(groupId).learners(learners).build();
-
- CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.learners = resp.newPeers();
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> snapshot(Peer peer) {
- SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
-
- // Disable the timeout for a snapshot request.
- CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(peer.address(), req, Integer.MAX_VALUE);
-
- return fut.thenApply(resp -> {
- if (resp != null) {
- RaftErrorResponse resp0 = (RaftErrorResponse) resp;
-
- if (resp0.errorCode() != null)
- throw new RaftException(resp0.errorCode(), resp0.errorMessage());
- }
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> transferLeadership(Peer newLeader) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> transferLeadership(newLeader));
-
- TransferLeadershipRequest req = factory.transferLeadershipRequest().groupId(groupId).newLeader(newLeader).build();
-
- CompletableFuture<?> fut = cluster.messagingService().invoke(newLeader.address(), req, timeout);
-
- return fut.thenApply(resp -> null);
- }
-
- /** {@inheritDoc} */
- @Override public <R> CompletableFuture<R> run(Command cmd) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> run(cmd));
-
- ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
-
- CompletableFuture<ActionResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> (R) resp.result());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
- ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
-
- CompletableFuture<?> fut = cluster.messagingService().invoke(peer.address(), req, timeout);
-
- return fut.thenApply(resp -> (R) ((ActionResponse) resp).result());
- }
-
- /** {@inheritDoc} */
- @Override public void shutdown() {
- IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
- }
-
- /**
- * Retries a request until success or timeout.
- *
- * @param peer Target peer.
- * @param req The request.
- * @param stopTime Stop time.
- * @param fut The future.
- * @param <R> Response type.
- */
- private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {
- if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(new TimeoutException());
-
- return;
- }
-
- CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, timeout);
-
- fut0.whenComplete(new BiConsumer<Object, Throwable>() {
- @Override public void accept(Object resp, Throwable err) {
- if (err != null) {
- if (recoverable(err)) {
- executor.schedule(() -> {
- sendWithRetry(randomNode(), req, stopTime, fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else
- fut.completeExceptionally(err);
- }
- else if (resp instanceof RaftErrorResponse) {
- RaftErrorResponse resp0 = (RaftErrorResponse) resp;
-
- if (resp0.errorCode() == null) { // Handle OK response.
- leader = peer; // The OK response was received from a leader.
-
- fut.complete(null); // Void response.
- }
- else if (resp0.errorCode().equals(NO_LEADER)) {
- executor.schedule(() -> {
- sendWithRetry(randomNode(), req, stopTime, fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else if (resp0.errorCode().equals(LEADER_CHANGED)) {
- leader = resp0.newLeader(); // Update a leader.
-
- executor.schedule(() -> {
- sendWithRetry(resp0.newLeader(), req, stopTime, fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else
- fut.completeExceptionally(new RaftException(resp0.errorCode(), resp0.errorMessage()));
- }
- else {
- leader = peer; // The OK response was received from a leader.
-
- fut.complete((R) resp);
- }
- }
- });
- }
-
- /**
- * Checks if an error is recoverable, for example, {@link java.net.ConnectException}.
- * @param t The throwable.
- * @return {@code True} if this is a recoverable exception.
- */
- private boolean recoverable(Throwable t) {
- return t.getCause() instanceof IOException;
- }
-
- /**
- * @return Random node.
- */
- private Peer randomNode() {
- List<Peer> peers0 = peers;
-
- if (peers0 == null || peers0.isEmpty())
- return null;
-
- return peers0.get(current().nextInt(peers0.size()));
- }
-}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
deleted file mode 100644
index a778ea9..0000000
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.client.service;
-
-import java.net.ConnectException;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.RaftErrorCode;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.exception.RaftException;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.when;
-
-/**
- * Test methods of raft group service.
- */
-@ExtendWith(MockitoExtension.class)
-public class RaftGroupServiceTest {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceTest.class);
-
- /** */
- private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
- .map(port -> new NetworkAddress("localhost", port))
- .map(Peer::new)
- .collect(Collectors.toUnmodifiableList());
-
- /** */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
-
- /** */
- private volatile Peer leader = NODES.get(0);
-
- /** Call timeout. */
- private static final int TIMEOUT = 1000;
-
- /** Retry delay. */
- private static final int DELAY = 200;
-
- /** Mock cluster. */
- @Mock
- private ClusterService cluster;
-
- /** Mock messaging service */
- @Mock
- private MessagingService messagingService;
-
- /**
- * @param testInfo Test info.
- */
- @BeforeEach
- void before(TestInfo testInfo) {
- when(cluster.messagingService()).thenReturn(messagingService);
-
- LOG.info(">>>> Starting test {}", testInfo.getTestMethod().orElseThrow().getName());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderStable() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- service.refreshLeader().get();
-
- assertEquals(leader, service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderNotElected() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
-
- // Simulate running elections.
- leader = null;
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- try {
- service.refreshLeader().get();
-
- fail("Should fail");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderElectedAfterDelay() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
-
- // Simulate running elections.
- leader = null;
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- leader = NODES.get(0);
- }
- }, 500);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- service.refreshLeader().get();
-
- assertEquals(NODES.get(0), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderWithTimeout() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(true);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- try {
- service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
-
- fail();
- }
- catch (TimeoutException e) {
- // Expected.
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElected() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- service.refreshLeader().get();
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLazyInitLeader() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(leader, service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestWithTimeout() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(true, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- try {
- service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
-
- fail();
- }
- catch (TimeoutException e) {
- // Expected.
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderNotElected() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- try {
- service.run(new TestCommand()).get();
-
- fail("Expecting timeout");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElectedAfterDelay() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- RaftGroupServiceTest.this.leader = NODES.get(0);
- }
- }, 500);
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(NODES.get(0), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElectedAfterDelayWithFailedNode() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, NODES.get(0));
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- LOG.info("Set leader {}", NODES.get(1));
-
- RaftGroupServiceTest.this.leader = NODES.get(1);
- }
- }, 500);
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(NODES.get(1), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderChanged() throws Exception {
- String groupId = "test";
-
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- Peer newLeader = NODES.get(1);
-
- this.leader = newLeader;
-
- assertEquals(leader, service.leader());
- assertNotEquals(leader, newLeader);
-
- // Runs the command on an old leader. It should respond with leader changed error, when transparently retry.
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(newLeader, service.leader());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testSnapshotExecutionException() throws Exception {
- String groupId = "test";
-
- mockSnapshotRequest(1);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- var addr = new NetworkAddress("localhost", 8082);
-
- CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
-
- try {
- fut.get();
-
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IgniteInternalException);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testSnapshotExecutionFailedResponse() throws Exception {
- String groupId = "test";
-
- mockSnapshotRequest(0);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
-
- var addr = new NetworkAddress("localhost", 8082);
-
- CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
-
- try {
- fut.get();
-
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof RaftException);
- }
- }
-
- /**
- * @param delay {@code True} to create a delay before response.
- * @param peer Fail the request targeted to given peer.
- */
- private void mockUserInput(boolean delay, @Nullable Peer peer) {
- when(messagingService.invoke(
- any(NetworkAddress.class),
- argThat(new ArgumentMatcher<ActionRequest>() {
- @Override public boolean matches(ActionRequest arg) {
- return arg.command() instanceof TestCommand;
- }
- }),
- anyLong()
- )).then(invocation -> {
- NetworkAddress target = invocation.getArgument(0);
-
- if (peer != null && target.equals(peer.address()))
- return failedFuture(new IgniteInternalException(new ConnectException()));
-
- if (delay) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- fail();
- }
-
- return FACTORY.actionResponse().result(new TestResponse()).build();
- });
- }
-
- Object resp;
-
- if (leader == null)
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- else if (target != leader.address())
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.LEADER_CHANGED).newLeader(leader).build();
- else
- resp = FACTORY.actionResponse().result(new TestResponse()).build();
-
- return completedFuture(resp);
- });
- }
-
- /**
- * @param delay {@code True} to delay response.
- */
- private void mockLeaderRequest(boolean delay) {
- when(messagingService.invoke(any(NetworkAddress.class), any(GetLeaderRequest.class), anyLong()))
- .then(invocation -> {
- if (delay) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- fail();
- }
-
- return FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- });
- }
-
- Peer leader0 = leader;
-
- Object resp = leader0 == null ?
- FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build() :
- FACTORY.getLeaderResponse().leader(leader0).build();
-
- return completedFuture(resp);
- });
- }
-
- /**
- * @param mode Mock mode.
- */
- private void mockSnapshotRequest(int mode) {
- when(messagingService.invoke(any(NetworkAddress.class), any(SnapshotRequest.class), anyLong()))
- .then(invocation -> {
- if (mode == 0) {
- return completedFuture(FACTORY.raftErrorResponse().errorCode(RaftErrorCode.SNAPSHOT).
- errorMessage("Failed to create a snapshot").build());
- }
- else
- return failedFuture(new IgniteInternalException("Very bad"));
- });
- }
-
- /** */
- private static class TestCommand implements WriteCommand {
- }
-
- /** */
- private static class TestResponse {
- }
-}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
index 0f55992..be8c056 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
@@ -38,8 +38,8 @@
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -71,7 +71,7 @@
.collect(Collectors.toUnmodifiableList());
/** Factory. */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Network factory. */
private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index 1d1631e..b5f6372 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -42,12 +42,12 @@
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.exception.RaftException;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
index c386e4c..52894b4 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
@@ -29,7 +29,7 @@
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index a4c6138..c606a47 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -27,7 +27,7 @@
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;
@@ -40,7 +40,7 @@
protected static final IgniteLogger LOG = IgniteLogger.forClass(RaftServerAbstractTest.class);
/** */
- protected static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+ protected static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Network factory. */
protected static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 055118a..cd9c577 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -27,17 +27,17 @@
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
/**
* Best raft manager ever since 1982.
*/
public class Loza implements IgniteComponent {
/** Factory. */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Timeout. */
private static final int TIMEOUT = 1000;
@@ -89,7 +89,7 @@
String locNodeName = clusterNetSvc.topologyService().localMember().name();
- if (nodes.stream().map(ClusterNode::name).collect(Collectors.toSet()).contains(locNodeName))
+ if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
raftServer.startRaftGroup(groupId, lsnr, peers);
return RaftGroupServiceImpl.start(
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index b0cf5c3..8340c64 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -135,7 +135,6 @@
rpcServer = new IgniteRpcServer(
service,
nodeManager,
- opts.getRaftClientMessagesFactory(),
opts.getRaftMessagesFactory(),
requestExecutor
);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
index 4af485d..2744274 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
@@ -21,6 +21,8 @@
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
/**
* Message group for the Raft module.
@@ -152,4 +154,19 @@
/** */
public static final short READ_INDEX_RESPONSE = 36;
}
+
+ /**
+ * Message types for Ignite actions.
+ */
+ public static final class RpcActionMessageGroup {
+ /**
+ * Message type for {@link ActionRequest}.
+ */
+ public static final short ACTION_REQUEST = 37;
+
+ /**
+ * Message type for {@link ActionResponse}.
+ */
+ public static final short ACTION_RESPONSE = 38;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
index b700440..93d7020 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
@@ -27,6 +27,7 @@
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
/**
* Represent a participant in a replicating group.
@@ -286,7 +287,16 @@
return this.idx == other.idx;
}
- public static PeerId fromPeer(Peer p) {
- return new PeerId(p.address().host(), p.address().port(), 0, p.getPriority());
+ /**
+ * Convert {@link Peer} to {@link PeerId}.
+ *
+ * @param p Peer.
+ * @return PeerId if {@code p != null}, {@code null} otherwise.
+ */
+ public static @Nullable PeerId fromPeer(@Nullable Peer p) {
+ if (p == null)
+ return null;
+ else
+ return new PeerId(p.address().host(), p.address().port(), 0, p.getPriority());
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
index c126074..8b09b02 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
@@ -266,4 +266,4 @@
RaftError(final int value) {
this.value = value;
}
-}
\ No newline at end of file
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 94c68e7..54ae2d0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -206,7 +206,6 @@
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
- raftOptions.setRaftClientMessagesFactory(getRaftClientMessagesFactory());
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index f3c4464..0c91980 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.raft.jraft.option;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.util.Copiable;
@@ -31,12 +30,6 @@
*/
private RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
- /** Raft client message factory.
- * <p>
- * Has a default value for easier testing. Should always be set externally in the production code.
- */
- private RaftClientMessagesFactory raftClientMessagesFactory = new RaftClientMessagesFactory();
-
/**
* Maximum of block size per RPC
*/
@@ -292,21 +285,6 @@
this.raftMessagesFactory = raftMessagesFactory;
}
- /**
- * @return Raft client message factory.
- */
- public RaftClientMessagesFactory getRaftClientMessagesFactory() {
- return raftClientMessagesFactory;
- }
-
- /**
- * Sets the Raft client message factory.
- */
- public void setRaftClientMessagesFactory(
- RaftClientMessagesFactory raftClientMessagesFactory) {
- this.raftClientMessagesFactory = raftClientMessagesFactory;
- }
-
/** {@inheritDoc} */
@Override
public RaftOptions copy() {
@@ -329,7 +307,6 @@
raftOptions.setEnableLogEntryChecksum(this.enableLogEntryChecksum);
raftOptions.setReadOnlyOptions(this.readOnlyOptions);
raftOptions.setRaftMessagesFactory(this.raftMessagesFactory);
- raftOptions.setRaftClientMessagesFactory(this.raftClientMessagesFactory);
return raftOptions;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
index b1af8a1..7452233 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
@@ -18,7 +18,6 @@
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.ExecutorService;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -31,9 +30,6 @@
/** Raft message factory. */
private RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
- /** Raft client message factory. */
- private RaftClientMessagesFactory raftClientMessagesFactory = new RaftClientMessagesFactory();
-
/**
* Rpc handshake timeout in milliseconds Default: 2000(1s)
*/
@@ -196,21 +192,6 @@
this.raftMessagesFactory = raftMessagesFactory;
}
- /**
- * @return Raft client message factory.
- */
- public RaftClientMessagesFactory getRaftClientMessagesFactory() {
- return raftClientMessagesFactory;
- }
-
- /**
- * Sets the Raft client message factory.
- */
- public void setRaftClientMessagesFactory(
- RaftClientMessagesFactory raftClientMessagesFactory) {
- this.raftClientMessagesFactory = raftClientMessagesFactory;
- }
-
/** {@inheritDoc} */
@Override
public String toString() {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
similarity index 79%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
rename to modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
index 9828bc7..cb7448f 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.client.message;
+package org.apache.ignite.raft.jraft.rpc;
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.rpc.Message;
/**
* Submit an action to a replication group.
*/
-@Transferable(value = RaftClientMessageGroup.ACTION_REQUEST, autoSerializable = false)
-public interface ActionRequest extends NetworkMessage, Serializable {
+@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_REQUEST, autoSerializable = false)
+public interface ActionRequest extends Message {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
similarity index 76%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
rename to modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
index e4eedf7..e2c1154 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.client.message;
+package org.apache.ignite.raft.jraft.rpc;
-import java.io.Serializable;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.rpc.Message;
/**
* The result of an action.
*/
-@Transferable(value = RaftClientMessageGroup.ACTION_RESPONSE, autoSerializable = false)
-public interface ActionResponse extends NetworkMessage, Serializable {
+@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_RESPONSE, autoSerializable = false)
+public interface ActionResponse extends Message {
/**
* @return A result for this request, can be of any type.
*/
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 7db59f9..3226453 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: rpc.proto
package org.apache.ignite.raft.jraft.rpc;
import java.util.List;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.util.ByteString;
+import org.jetbrains.annotations.Nullable;
public final class RpcRequests {
private RpcRequests() {
@@ -40,14 +40,28 @@
@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.ERROR_RESPONSE, autoSerializable = false)
public interface ErrorResponse extends Message {
/**
- * <code>required int32 errorCode = 1;</code>
+ * Error code.
+ *
+ * Note: despite of the naming, 0 code is the success response,
+ * see {@link RaftError#SUCCESS}.
+ *
+ * @return error code.
*/
int errorCode();
/**
- * <code>optional string errorMsg = 2;</code>
+ * Error message.
+ *
+ * @return String with error message.
*/
String errorMsg();
+
+ /**
+ * New leader id if provided, null otherwise.
+ *
+ * @return String new leader id, null otherwise.
+ */
+ @Nullable String leaderId();
}
@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.INSTALL_SNAPSHOT_REQUEST, autoSerializable = false)
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseFactory.java
index 8814849..f04e411 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseFactory.java
@@ -73,4 +73,25 @@
eBuilder.errorMsg(String.format(fmt, args));
return eBuilder.build();
}
+
+ /**
+ * Creates an error response with parameters.
+ *
+ * @param leaderId New leader id, can be null
+ * @param msgFactory Raft message factory
+ * @param error error with raft info
+ * @param fmt message with format string
+ * @param args arguments referenced by the format specifiers in the format string
+ * @return a response instance
+ */
+ default RpcRequests.ErrorResponse newResponse(String leaderId, RaftMessagesFactory msgFactory, RaftError error, String fmt, Object... args) {
+ ErrorResponseBuilder eBuilder = msgFactory.errorResponse();
+ eBuilder.errorCode(error.getNumber());
+ eBuilder.leaderId(leaderId);
+
+ if (fmt != null)
+ eBuilder.errorMsg(String.format(fmt, args));
+
+ return eBuilder.build();
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
similarity index 75%
rename from modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java
rename to modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 7f80b80..44b3b0f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.rpc.impl.client;
+package org.apache.ignite.raft.jraft.rpc.impl;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -22,14 +22,14 @@
import java.util.concurrent.Executor;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.raft.client.Command;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
-import org.apache.ignite.raft.client.message.RaftErrorResponseBuilder;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ErrorResponseBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Node;
@@ -43,17 +43,15 @@
import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
-import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
-
/**
* Process action request.
*/
public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
private final Executor executor;
- private final RaftClientMessagesFactory factory;
+ private final RaftMessagesFactory factory;
- public ActionRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
+ public ActionRequestProcessor(Executor executor, RaftMessagesFactory factory) {
this.executor = executor;
this.factory = factory;
}
@@ -63,7 +61,7 @@
Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalAddress()));
if (node == null) {
- rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.ILLEGAL_STATE).build());
+ rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.UNKNOWN.getNumber()).build());
return;
}
@@ -102,7 +100,7 @@
}).iterator());
}
catch (Exception e) {
- sendError(rpcCtx, RaftErrorCode.STATE_MACHINE, e.getMessage());
+ sendError(rpcCtx, RaftError.ESTATEMACHINE, e.getMessage());
}
}
else
@@ -127,7 +125,7 @@
}).iterator());
}
catch (Exception e) {
- sendError(rpcCtx, RaftErrorCode.STATE_MACHINE, e.getMessage());
+ sendError(rpcCtx, RaftError.ESTATEMACHINE, e.getMessage());
}
}
}
@@ -145,13 +143,13 @@
/**
* @param ctx Context.
- * @param errorCode Error code.
+ * @param error RaftError code.
* @param msg Message.
*/
- private void sendError(RpcContext ctx, RaftErrorCode errorCode, String msg) {
- RaftErrorResponse resp = factory.raftErrorResponse()
- .errorCode(errorCode)
- .errorMessage(msg)
+ private void sendError(RpcContext ctx, RaftError error, String msg) {
+ RpcRequests.ErrorResponse resp = factory.errorResponse()
+ .errorCode(error.getNumber())
+ .errorMsg(msg)
.build();
ctx.sendResponse(resp);
@@ -165,27 +163,16 @@
private void sendError(RpcContext ctx, Status status, Node node) {
RaftError raftError = status.getRaftError();
- RaftErrorCode raftErrorCode = RaftErrorCode.ILLEGAL_STATE;
+ Message response;
- PeerId newLeader = null;
+ if (raftError == RaftError.EPERM && node.getLeaderId() != null)
+ response = RaftRpcFactory.DEFAULT
+ .newResponse(node.getLeaderId().toString(), factory, RaftError.EPERM, status.getErrorMsg());
+ else
+ response = RaftRpcFactory.DEFAULT
+ .newResponse(factory, raftError, status.getErrorMsg());
- if (raftError == RaftError.EPERM) {
- newLeader = node.getLeaderId();
- PeerId myId = node.getNodeId().getPeerId();
-
- raftErrorCode = newLeader == null || myId.equals(newLeader) ?
- RaftErrorCode.NO_LEADER : RaftErrorCode.LEADER_CHANGED;
- }
- else if (status.getRaftError() == RaftError.ESTATEMACHINE)
- raftErrorCode = RaftErrorCode.STATE_MACHINE;
-
- RaftErrorResponseBuilder resp =
- factory.raftErrorResponse().errorCode(raftErrorCode).errorMessage(status.getErrorMsg());
-
- if (newLeader != null)
- resp.newLeader(new Peer(addressFromEndpoint(newLeader.getEndpoint())));
-
- ctx.sendResponse(resp.build());
+ ctx.sendResponse(response);
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 9a42dc8..164f4b3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,17 +21,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.TopologyEventHandler;
-import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.NodeManager;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
@@ -46,7 +44,6 @@
import org.apache.ignite.raft.jraft.rpc.impl.cli.ResetPeerRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.SnapshotRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.TransferLeaderRequestProcessor;
-import org.apache.ignite.raft.jraft.rpc.impl.client.ActionRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.GetFileRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcessor;
@@ -71,14 +68,12 @@
/**
* @param service The cluster service.
* @param nodeManager The node manager.
- * @param raftClientMessagesFactory Client message factory.
* @param raftMessagesFactory Message factory.
* @param rpcExecutor The executor for RPC requests.
*/
public IgniteRpcServer(
ClusterService service,
NodeManager nodeManager,
- RaftClientMessagesFactory raftClientMessagesFactory,
RaftMessagesFactory raftMessagesFactory,
Executor rpcExecutor
) {
@@ -110,14 +105,11 @@
registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
// common client integration
- registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.GetLeaderRequestProcessor(rpcExecutor, raftClientMessagesFactory));
- registerProcessor(new ActionRequestProcessor(rpcExecutor, raftClientMessagesFactory));
- registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, raftClientMessagesFactory));
+ registerProcessor(new ActionRequestProcessor(rpcExecutor, raftMessagesFactory));
var messageHandler = new RpcMessageHandler();
service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
- service.messagingService().addMessageHandler(RaftClientMessageGroup.class, messageHandler);
service.topologyService().addEventHandler(new TopologyEventHandler() {
@Override public void onAppeared(ClusterNode member) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftException.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftException.java
new file mode 100644
index 0000000..8e21846
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.raft.jraft.error.RaftError;
+
+/**
+ * A raft exception containing code and description.
+ */
+public class RaftException extends IgniteInternalCheckedException {
+ /** Raft error. */
+ private final RaftError raftError;
+
+ /**
+ * @param raftError RaftError.
+ */
+ public RaftException(RaftError raftError) {
+ super(RaftError.describeCode(raftError.getNumber()));
+ this.raftError = raftError;
+ }
+
+ /**
+ * @param raftError RaftError..
+ * @param message Error message.
+ */
+ public RaftException(RaftError raftError, String message) {
+ super(raftError.name() + ":" + message);
+
+ this.raftError = raftError;
+ }
+
+ /**
+ * @return RaftError.
+ */
+ public RaftError raftError() {
+ return raftError;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
new file mode 100644
index 0000000..fa5994c
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.jetbrains.annotations.NotNull;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
+/**
+ * The implementation of {@link RaftGroupService}
+ */
+public class RaftGroupServiceImpl implements RaftGroupService {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceImpl.class);
+
+ /** */
+ private volatile long timeout;
+
+ /** */
+ private final String groupId;
+
+ /** */
+ private final RaftMessagesFactory factory;
+
+ /** */
+ private volatile Peer leader;
+
+ /** */
+ private volatile List<Peer> peers;
+
+ /** */
+ private volatile List<Peer> learners;
+
+ /** */
+ private final ClusterService cluster;
+
+ /** */
+ private final long retryDelay;
+
+ /** */
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
+ /**
+ * Constructor.
+ *
+ * @param groupId Group id.
+ * @param cluster A cluster.
+ * @param factory A message factory.
+ * @param timeout Request timeout.
+ * @param peers Initial group configuration.
+ * @param leader Group leader.
+ * @param retryDelay Retry delay.
+ */
+ private RaftGroupServiceImpl(
+ String groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ List<Peer> peers,
+ Peer leader,
+ long retryDelay
+ ) {
+ this.cluster = requireNonNull(cluster);
+ this.peers = requireNonNull(peers);
+ this.learners = Collections.emptyList();
+ this.factory = factory;
+ this.timeout = timeout;
+ this.groupId = groupId;
+ this.retryDelay = retryDelay;
+ this.leader = leader;
+ }
+
+ /**
+ * Starts raft group service.
+ *
+ * @param groupId Raft group id.
+ * @param cluster Cluster service.
+ * @param factory Message factory.
+ * @param timeout Timeout.
+ * @param peers List of all peers.
+ * @param getLeader {@code True} to get the group's leader upon service creation.
+ * @param retryDelay Retry delay.
+ * @return Future representing pending completion of the operation.
+ */
+ public static CompletableFuture<RaftGroupService> start(
+ String groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ List<Peer> peers,
+ boolean getLeader,
+ long retryDelay
+ ) {
+ var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, peers, null, retryDelay);
+
+ if (!getLeader)
+ return CompletableFuture.completedFuture(service);
+
+ return service.refreshLeader().handle((unused, throwable) -> {
+ if (throwable != null)
+ LOG.error("Failed to refresh a leader", throwable);
+
+ return service;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String groupId() {
+ return groupId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout() {
+ return timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void timeout(long newTimeout) {
+ this.timeout = newTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Peer leader() {
+ return leader;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Peer> peers() {
+ return peers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Peer> learners() {
+ return learners;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> refreshLeader() {
+ GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
+
+ CompletableFuture<GetLeaderResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(randomNode(), req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ leader = parsePeer(resp.leaderId());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
+ GetPeersRequest req = factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
+
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> refreshMembers(onlyAlive));
+
+ CompletableFuture<GetPeersResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ peers = parsePeerList(resp.peersList());
+ learners = parsePeerList(resp.learnersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> addPeer(Peer peer) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> addPeer(peer));
+
+ AddPeerRequest req = factory.addPeerRequest().groupId(groupId).peerId(PeerId.fromPeer(peer).toString()).build();
+
+ CompletableFuture<AddPeerResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> removePeer(Peer peer) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> removePeer(peer));
+
+ RemovePeerRequest req = factory.removePeerRequest().groupId(groupId).peerId(PeerId.fromPeer(peer).toString()).build();
+
+ CompletableFuture<RemovePeerResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> changePeers(List<Peer> peers) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> changePeers(peers));
+
+ List<String> peersToChange = peers.stream().map(p -> PeerId.fromPeer(p).toString())
+ .collect(Collectors.toList());
+
+ ChangePeersRequest req = factory.changePeersRequest().groupId(groupId)
+ .newPeersList(peersToChange).build();
+
+ CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> addLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> addLearners(learners));
+
+ List<String> lrns = learners.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+ AddLearnersRequest req = factory.addLearnersRequest().groupId(groupId).learnersList(lrns).build();
+
+ CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.learners = parsePeerList(resp.newLearnersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> addLearners(learners));
+
+ List<String> lrns = learners.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+ RemoveLearnersRequest req = factory.removeLearnersRequest().groupId(groupId).learnersList(lrns).build();
+
+ CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.learners = parsePeerList(resp.newLearnersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> resetLearners(learners));
+
+ List<String> lrns = learners.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+ ResetLearnersRequest req = factory.resetLearnersRequest().groupId(groupId).learnersList(lrns).build();
+
+ CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> {
+ this.learners = parsePeerList(resp.newLearnersList());
+
+ return null;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> snapshot(Peer peer) {
+ SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
+
+ // Disable the timeout for a snapshot request.
+ CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(peer.address(), req, Integer.MAX_VALUE);
+
+ return fut.thenCompose(resp -> {
+ if (resp != null) {
+ RpcRequests.ErrorResponse resp0 = (RpcRequests.ErrorResponse) resp;
+
+ if (resp0.errorCode() != RaftError.SUCCESS.getNumber())
+ return CompletableFuture.failedFuture(new RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()));
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> transferLeadership(Peer newLeader) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> transferLeadership(newLeader));
+
+ TransferLeaderRequest req = factory.transferLeaderRequest()
+ .groupId(groupId).leaderId(PeerId.fromPeer(newLeader).toString()).build();
+
+ CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(newLeader.address(), req, timeout);
+
+ return fut.thenCompose(resp -> {
+ if (resp != null) {
+ RpcRequests.ErrorResponse resp0 = (RpcRequests.ErrorResponse) resp;
+
+ if (resp0.errorCode() != RaftError.SUCCESS.getNumber())
+ CompletableFuture.failedFuture(
+ new RaftException(
+ RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()
+ )
+ );
+ else
+ this.leader = newLeader;
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> CompletableFuture<R> run(Command cmd) {
+ Peer leader = this.leader;
+
+ if (leader == null)
+ return refreshLeader().thenCompose(res -> run(cmd));
+
+ ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+ CompletableFuture<ActionResponse> fut = new CompletableFuture<>();
+
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
+
+ return fut.thenApply(resp -> (R) resp.result());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
+ ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+ CompletableFuture<?> fut = cluster.messagingService().invoke(peer.address(), req, timeout);
+
+ return fut.thenApply(resp -> (R) ((ActionResponse) resp).result());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void shutdown() {
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Retries a request until success or timeout.
+ *
+ * @param peer Target peer.
+ * @param req The request.
+ * @param stopTime Stop time.
+ * @param fut The future.
+ * @param <R> Response type.
+ */
+ private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {
+ if (currentTimeMillis() >= stopTime) {
+ fut.completeExceptionally(new TimeoutException());
+
+ return;
+ }
+
+ CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, timeout);
+
+ fut0.whenComplete(new BiConsumer<Object, Throwable>() {
+ @Override public void accept(Object resp, Throwable err) {
+ if (err != null) {
+ if (recoverable(err)) {
+ executor.schedule(() -> {
+ sendWithRetry(randomNode(), req, stopTime, fut);
+
+ return null;
+ }, retryDelay, TimeUnit.MILLISECONDS);
+ }
+ else
+ fut.completeExceptionally(err);
+ }
+ else if (resp instanceof RpcRequests.ErrorResponse) {
+ RpcRequests.ErrorResponse resp0 = (RpcRequests.ErrorResponse) resp;
+
+ if (resp0.errorCode() == RaftError.SUCCESS.getNumber()) { // Handle OK response.
+ leader = peer; // The OK response was received from a leader.
+
+ fut.complete(null); // Void response.
+ }
+ else if (resp0.errorCode() == RaftError.EBUSY.getNumber() ||
+ resp0.errorCode() == (RaftError.EAGAIN.getNumber())) {
+ executor.schedule(() -> {
+ sendWithRetry(peer, req, stopTime, fut);
+
+ return null;
+ }, retryDelay, TimeUnit.MILLISECONDS);
+ }
+ else if (resp0.errorCode() == RaftError.EPERM.getNumber()) {
+ if (resp0.leaderId() == null) {
+ executor.schedule(() -> {
+ sendWithRetry(randomNode(), req, stopTime, fut);
+
+ return null;
+ }, retryDelay, TimeUnit.MILLISECONDS);
+ }
+ else {
+ leader = parsePeer(resp0.leaderId()); // Update a leader.
+
+ executor.schedule(() -> {
+ sendWithRetry(leader, req, stopTime, fut);
+
+ return null;
+ }, retryDelay, TimeUnit.MILLISECONDS);
+
+ }
+ }
+ else
+ fut.completeExceptionally(
+ new RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()));
+ }
+ else {
+ leader = peer; // The OK response was received from a leader.
+
+ fut.complete((R) resp);
+ }
+ }
+ });
+ }
+
+ /**
+ * Checks if an error is recoverable, for example, {@link java.net.ConnectException}.
+ * @param t The throwable.
+ * @return {@code True} if this is a recoverable exception.
+ */
+ private boolean recoverable(Throwable t) {
+ return t.getCause() instanceof IOException;
+ }
+
+ /**
+ * @return Random node.
+ */
+ private Peer randomNode() {
+ List<Peer> peers0 = peers;
+
+ if (peers0 == null || peers0.isEmpty())
+ return null;
+
+ return peers0.get(current().nextInt(peers0.size()));
+ }
+
+ /**
+ * Parse {@link Peer} from string representation of {@link PeerId}.
+ *
+ * @param peerId String representation of {@link PeerId}
+ * @return Peer
+ */
+ // TODO: Remove after IGNITE-15506
+ private static Peer parsePeer(String peerId) {
+ return peerFromPeerId(PeerId.parsePeer(peerId));
+ }
+
+ /**
+ * Creates new {@link Peer} from {@link PeerId}.
+ *
+ * @param peer PeerId
+ * @return {@link Peer}
+ */
+ private static Peer peerFromPeerId(PeerId peer) {
+ if (peer == null)
+ return null;
+ else
+ return new Peer(NetworkAddress.from(peer.getEndpoint().getIp() + ":" + peer.getEndpoint().getPort()));
+ }
+
+ /**
+ * Parse list of {@link PeerId} from list with string representations.
+ *
+ * @param peers List of {@link PeerId} string representations.
+ * @return List of {@link PeerId}
+ */
+ private List<Peer> parsePeerList(List<String> peers) {
+ if (peers == null)
+ return null;
+
+ List<Peer> res = new ArrayList<>(peers.size());
+
+ for (String peer: peers)
+ res.add(parsePeer(peer));
+
+ return res;
+ }
+
+ /**
+ * Convert list of {@link PeerId} to list of {@link Peer}.
+ *
+ * @param peers List of {@link PeerId}
+ * @return List of {@link Peer}
+ */
+ private List<Peer> convertPeerIdList(List<PeerId> peers) {
+ if (peers == null)
+ return Collections.emptyList();
+
+ List<Peer> res = new ArrayList<>(peers.size());
+
+ for (PeerId peerId: peers)
+ res.add(peerFromPeerId(peerId));
+
+ return res;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddLearnersRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddLearnersRequestProcessor.java
index 744c5f4..7799da6 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddLearnersRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddLearnersRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* AddLearners request processor.
@@ -50,7 +49,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final AddLearnersRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldLearners = ctx.node.listLearners();
final List<PeerId> addingLearners = new ArrayList<>();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddPeerRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddPeerRequestProcessor.java
index f438831..79d9643 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddPeerRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AddPeerRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* AddPeer request processor.
@@ -49,7 +48,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final AddPeerRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldPeers = ctx.node.listPeers();
final String addingPeerIdStr = request.peerId();
final PeerId addingPeer = new PeerId();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessor.java
index af46fff..fd6308a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessor.java
@@ -59,7 +59,7 @@
/**
* Process the request with CliRequestContext
*/
- protected abstract Message processRequest0(CliRequestContext ctx, T request, RpcRequestClosure done);
+ protected abstract Message processRequest0(CliRequestContext ctx, T request, IgniteCliRpcRequestClosure done);
/**
* Cli request context
@@ -110,7 +110,7 @@
.newResponse(msgFactory(), st.getCode(), st.getErrorMsg());
}
else {
- return processRequest0(new CliRequestContext(node, groupId, peerId), request, done);
+ return processRequest0(new CliRequestContext(node, groupId, peerId), request, new IgniteCliRpcRequestClosure(node, done));
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
index 43eb4c6..e10c063 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import static java.util.stream.Collectors.toList;
@@ -51,7 +50,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final ChangePeersRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldConf = ctx.node.listPeers();
final Configuration conf = new Configuration();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
index e3062e8..f8be228 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
@@ -50,7 +50,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final GetLeaderRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
// ignore
return null;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetPeersRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetPeersRequestProcessor.java
index 772da39..1c63139 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetPeersRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetPeersRequestProcessor.java
@@ -22,7 +22,6 @@
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import static java.util.stream.Collectors.toList;
@@ -47,7 +46,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final GetPeersRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> peers;
final List<PeerId> learners;
if (request.onlyAlive()) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
new file mode 100644
index 0000000..42f8c2a
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl.cli;
+
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+
+/**
+ * Ignite wrapper for {@link RpcRequestClosure}.
+ *
+ * The main purpose: provide current leader id in error response
+ * if any {@link RaftError#EPERM} issues with the current node.
+ */
+public class IgniteCliRpcRequestClosure implements Closure {
+ /** Node, which call this closure. */
+ private final Node node;
+
+ /** Original closure. */
+ private final RpcRequestClosure delegate;
+
+ /** Creates new closure. */
+ IgniteCliRpcRequestClosure(Node node, RpcRequestClosure closure) {
+ this.node = node;
+ this.delegate = closure;
+ }
+
+ /**
+ * @see RpcRequestClosure#getRpcCtx()
+ */
+ public RpcContext getRpcCtx() {
+ return delegate.getRpcCtx();
+ }
+
+ /**
+ * @see RpcRequestClosure#getMsgFactory()
+ */
+ public RaftMessagesFactory getMsgFactory() {
+ return delegate.getMsgFactory();
+ }
+
+ /**
+ * @see RpcRequestClosure#sendResponse(Message)
+ */
+ public void sendResponse(Message msg) {
+ if (msg instanceof RpcRequests.ErrorResponse) {
+ RpcRequests.ErrorResponse err = (RpcRequests.ErrorResponse) msg;
+
+ PeerId newLeader;
+
+ if (err.errorCode() == RaftError.EPERM.getNumber()) {
+ newLeader = node.getLeaderId();
+
+ delegate.sendResponse(
+ RaftRpcFactory.DEFAULT
+ .newResponse(newLeader.toString(), getMsgFactory(), RaftError.EPERM, err.errorMsg()));
+ return;
+ }
+ }
+
+ delegate.sendResponse(msg);
+ }
+
+ /**
+ * @see RpcRequestClosure#run(Status)
+ */
+ @Override public void run(Status status) {
+ sendResponse(RaftRpcFactory.DEFAULT.newResponse(getMsgFactory(), status));
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java
index 4f83b7a..55624a9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* RemoveLearners request processor.
@@ -49,7 +48,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final RemoveLearnersRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldLearners = ctx.node.listLearners();
final List<PeerId> removeingLearners = new ArrayList<>(request.learnersList().size());
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemovePeerRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemovePeerRequestProcessor.java
index a0765e3..bda8b08 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemovePeerRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/RemovePeerRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* Remove peer request processor.
@@ -49,7 +48,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final RemovePeerRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldPeers = ctx.node.listPeers();
final String removingPeerIdStr = request.peerId();
final PeerId removingPeer = new PeerId();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetLearnersRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetLearnersRequestProcessor.java
index f7f6855..e263ed7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetLearnersRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetLearnersRequestProcessor.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import static java.util.stream.Collectors.toList;
@@ -51,7 +50,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final ResetLearnersRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldLearners = ctx.node.listLearners();
final List<PeerId> newLearners = new ArrayList<>(request.learnersList().size());
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetPeerRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetPeerRequestProcessor.java
index cd39da5..54abf2e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetPeerRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ResetPeerRequestProcessor.java
@@ -25,7 +25,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetPeerRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* Reset peer request processor.
@@ -48,7 +47,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final ResetPeerRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final Configuration newConf = new Configuration();
for (final String peerIdStr : request.newPeersList()) {
final PeerId peer = new PeerId();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/SnapshotRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/SnapshotRequestProcessor.java
index daa6673..1adb71f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/SnapshotRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/SnapshotRequestProcessor.java
@@ -20,7 +20,6 @@
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* Snapshot request processor.
@@ -43,7 +42,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final SnapshotRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
LOG.info("Receive SnapshotRequest to {} from {}", ctx.node.getNodeId(), request.peerId());
ctx.node.snapshot(done);
return null;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/TransferLeaderRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/TransferLeaderRequestProcessor.java
index a65e83a..4842998 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/TransferLeaderRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/TransferLeaderRequestProcessor.java
@@ -24,7 +24,6 @@
import org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
/**
* Snapshot request processor.
@@ -47,7 +46,7 @@
@Override
protected Message processRequest0(final CliRequestContext ctx, final TransferLeaderRequest request,
- final RpcRequestClosure done) {
+ final IgniteCliRpcRequestClosure done) {
final PeerId peer = new PeerId();
if (request.peerId() != null && !peer.parse(request.peerId())) {
return RaftRpcFactory.DEFAULT //
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java
deleted file mode 100644
index 4723ed7..0000000
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.jraft.rpc.impl.client;
-
-import java.util.concurrent.Executor;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.RaftErrorCode;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.jraft.Node;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.rpc.RpcContext;
-import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
-
-import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
-
-/**
- * Process get leader request.
- */
-public class GetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
- private final Executor executor;
-
- private final RaftClientMessagesFactory factory;
-
- public GetLeaderRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
- this.executor = executor;
- this.factory = factory;
- }
-
- /** {@inheritDoc} */
- @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
- NetworkAddress localAddr = rpcCtx.getLocalAddress();
-
- Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(localAddr.host(), localAddr.port()));
-
- // There's a race between starting a raft group and requesting a leader for it.
- // Let's say that there are 3 ignite nodes {A,B,C} in the cluster and according to an affinity raft group
- // with only one raft node should be created on ignite node A. However it's possible to request given
- // raft group from another ignite node B from within {@link RaftGroupService}. So that it's possible
- // that raftGroupService will be already created and requests a leader from a not yet existing raft group.
- if (node == null) {
- rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build());
-
- return;
- }
-
- PeerId leaderId = node.getLeaderId();
-
- if (leaderId == null) {
- rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build());
-
- return;
- }
-
- // Find by host and port.
- Peer leader0 = new Peer(addressFromEndpoint(leaderId.getEndpoint()));
-
- rpcCtx.sendResponse(factory.getLeaderResponse().leader(leader0).build());
- }
-
- /** {@inheritDoc} */
- @Override public String interest() {
- return GetLeaderRequest.class.getName();
- }
-
- /** {@inheritDoc} */
- @Override public Executor executor() {
- return executor;
- }
-}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java
deleted file mode 100644
index 2c8d25f..0000000
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.jraft.rpc.impl.client;
-
-import java.util.concurrent.Executor;
-import org.apache.ignite.raft.client.RaftErrorCode;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.message.RaftErrorResponseBuilder;
-import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.jraft.Node;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.rpc.RpcContext;
-import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
-
-/**
- * Process snapshot request.
- */
-public class SnapshotRequestProcessor implements RpcProcessor<SnapshotRequest> {
- private final Executor executor;
-
- private final RaftClientMessagesFactory factory;
-
- public SnapshotRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
- this.executor = executor;
- this.factory = factory;
- }
-
- /** {@inheritDoc} */
- @Override public void handleRequest(RpcContext rpcCtx, SnapshotRequest request) {
- Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalAddress()));
-
- if (node == null) {
- rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.ILLEGAL_STATE).build());
-
- return;
- }
-
- node.snapshot(status -> {
- RaftErrorResponseBuilder resp = factory.raftErrorResponse();
-
- if (!status.isOk())
- resp.errorCode(RaftErrorCode.SNAPSHOT).errorMessage(status.getErrorMsg());
-
- rpcCtx.sendResponse(resp.build());
- });
- }
-
- /** {@inheritDoc} */
- @Override public String interest() {
- return SnapshotRequest.class.getName();
- }
-
- /** {@inheritDoc} */
- @Override public Executor executor() {
- return executor;
- }
-}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index d148224..bacc052 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -33,17 +33,17 @@
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.jetbrains.annotations.Nullable;
/**
@@ -58,7 +58,7 @@
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftServerImpl.class);
/** */
- private final RaftClientMessagesFactory clientMsgFactory;
+ private final RaftMessagesFactory clientMsgFactory;
/** */
private final ClusterService service;
@@ -82,7 +82,7 @@
* @param service Network service.
* @param clientMsgFactory Client message factory.
*/
- public RaftServerImpl(ClusterService service, RaftClientMessagesFactory clientMsgFactory) {
+ public RaftServerImpl(ClusterService service, RaftMessagesFactory clientMsgFactory) {
Objects.requireNonNull(service);
Objects.requireNonNull(clientMsgFactory);
@@ -96,12 +96,12 @@
/** {@inheritDoc} */
@Override public void start() {
service.messagingService().addMessageHandler(
- RaftClientMessageGroup.class,
+ RaftMessageGroup.class,
(message, senderAddr, correlationId) -> {
- if (message instanceof GetLeaderRequest) {
+ if (message instanceof CliRequests.GetLeaderRequest) {
var localPeer = new Peer(service.topologyService().localMember().address());
- GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
+ CliRequests.GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leaderId(PeerId.fromPeer(localPeer).toString()).build();
service.messagingService().send(senderAddr, resp, correlationId);
}
@@ -111,7 +111,7 @@
RaftGroupListener lsnr = listeners.get(req0.groupId());
if (lsnr == null) {
- sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
+ sendError(senderAddr, correlationId, RaftError.UNKNOWN);
return;
}
@@ -217,7 +217,7 @@
}
})) {
// Queue out of capacity.
- sendError(sender, corellationId, RaftErrorCode.BUSY);
+ sendError(sender, corellationId, RaftError.EBUSY);
}
}
@@ -247,8 +247,8 @@
}
}
- private void sendError(NetworkAddress sender, String corellationId, RaftErrorCode errorCode) {
- RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
+ private void sendError(NetworkAddress sender, String corellationId, RaftError error) {
+ RpcRequests.ErrorResponse resp = clientMsgFactory.errorResponse().errorCode(error.getNumber()).build();
service.messagingService().send(sender, resp, corellationId);
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
new file mode 100644
index 0000000..601fb93
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
@@ -0,0 +1,848 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.core;
+
+import java.net.ConnectException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.CliRequests;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test methods of raft group service.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RaftGroupServiceTest {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceTest.class);
+
+ /** */
+ private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
+ .map(port -> new NetworkAddress("localhost", port))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
+
+ /** */
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+
+ /** */
+ private volatile Peer leader = NODES.get(0);
+
+ /** Call timeout. */
+ private static final int TIMEOUT = 1000;
+
+ /** Retry delay. */
+ private static final int DELAY = 200;
+
+ /** Mock cluster. */
+ @Mock
+ private ClusterService cluster;
+
+ /** Mock messaging service */
+ @Mock
+ private MessagingService messagingService;
+
+ /**
+ * @param testInfo Test info.
+ */
+ @BeforeEach
+ void before(TestInfo testInfo) {
+ when(cluster.messagingService()).thenReturn(messagingService);
+
+ LOG.info(">>>> Starting test {}", testInfo.getTestMethod().orElseThrow().getName());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshLeaderStable() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertNull(service.leader());
+
+ service.refreshLeader().get();
+
+ assertEquals(leader, service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshLeaderNotElected() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+
+ // Simulate running elections.
+ leader = null;
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertNull(service.leader());
+
+ try {
+ service.refreshLeader().get();
+
+ fail("Should fail");
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshLeaderElectedAfterDelay() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+
+ // Simulate running elections.
+ leader = null;
+
+ Timer timer = new Timer();
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ leader = NODES.get(0);
+ }
+ }, 500);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertNull(service.leader());
+
+ service.refreshLeader().get();
+
+ assertEquals(NODES.get(0), service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshLeaderWithTimeout() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(true);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ try {
+ service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
+
+ fail();
+ }
+ catch (TimeoutException e) {
+ // Expected.
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLeaderElected() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ service.refreshLeader().get();
+
+ TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
+
+ assertNotNull(resp);
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLazyInitLeader() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertNull(service.leader());
+
+ TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
+
+ assertNotNull(resp);
+
+ assertEquals(leader, service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestWithTimeout() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(true, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ try {
+ service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
+
+ fail();
+ }
+ catch (TimeoutException e) {
+ // Expected.
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLeaderNotElected() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ try {
+ service.run(new TestCommand()).get();
+
+ fail("Expecting timeout");
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLeaderElectedAfterDelay() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ Timer timer = new Timer();
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ RaftGroupServiceTest.this.leader = NODES.get(0);
+ }
+ }, 500);
+
+ TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
+
+ assertNotNull(resp);
+
+ assertEquals(NODES.get(0), service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLeaderElectedAfterDelayWithFailedNode() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, NODES.get(0));
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ Timer timer = new Timer();
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ LOG.info("Set leader {}", NODES.get(1));
+
+ RaftGroupServiceTest.this.leader = NODES.get(1);
+ }
+ }, 500);
+
+ TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
+
+ assertNotNull(resp);
+
+ assertEquals(NODES.get(1), service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testUserRequestLeaderChanged() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ Peer newLeader = NODES.get(1);
+
+ this.leader = newLeader;
+
+ assertEquals(leader, service.leader());
+ assertNotEquals(leader, newLeader);
+
+ // Runs the command on an old leader. It should respond with leader changed error, when transparently retry.
+ TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
+
+ assertNotNull(resp);
+
+ assertEquals(newLeader, service.leader());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSnapshotExecutionException() throws Exception {
+ String groupId = "test";
+
+ mockSnapshotRequest(1);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ var addr = new NetworkAddress("localhost", 8082);
+
+ CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
+
+ try {
+ fut.get();
+
+ fail();
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IgniteInternalException);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSnapshotExecutionFailedResponse() throws Exception {
+ String groupId = "test";
+
+ mockSnapshotRequest(0);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get(3, TimeUnit.SECONDS);
+
+ var addr = new NetworkAddress("localhost", 8082);
+
+ CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
+
+ try {
+ fut.get();
+
+ fail();
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof RaftException);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshMembers() throws Exception {
+ String groupId = "test";
+
+ List<String> respPeers = peersToIds(NODES.subList(0, 2));
+ List<String> respLearners = peersToIds(NODES.subList(2, 2));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.getPeersRequest().onlyAlive(false).groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.getPeersResponse().peersList(respPeers).learnersList(respLearners).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.refreshMembers(false);
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(NODES.subList(2, 2), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testAddPeer() throws Exception {
+ String groupId = "test";
+
+ List<String> respPeers = peersToIds(NODES);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addPeerRequest()
+ .peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" + NODES.get(2).address().port()).toString())
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.addPeerResponse().newPeersList(respPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.addPeer(NODES.get(2)).get();
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRemovePeer() throws Exception {
+ String groupId = "test";
+
+ List<String> respPeers = peersToIds(NODES.subList(0, 2));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.removePeerRequest()
+ .peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" + NODES.get(2).address().port()).toString())
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.removePeerResponse().newPeersList(respPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.removePeer(NODES.get(2)).get();
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testChangePeers() throws Exception {
+ String groupId = "test";
+
+ List<String> shrunkPeers = peersToIds(NODES.subList(0, 1));
+
+ List<String> extendedPeers = peersToIds(NODES);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.changePeersRequest()
+ .newPeersList(shrunkPeers)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.changePeersResponse().newPeersList(shrunkPeers).build()));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.changePeersRequest()
+ .newPeersList(extendedPeers)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.changePeersResponse().newPeersList(extendedPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.changePeers(NODES.subList(0, 1)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.changePeers(NODES).get();
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testTransferLeadership() throws Exception {
+ String groupId = "test";
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.transferLeaderRequest()
+ .leaderId(PeerId.fromPeer(NODES.get(1)).toString())
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY, Status.OK())));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES.get(0), service.leader());
+
+ service.transferLeadership(NODES.get(1)).get();
+
+ assertEquals(NODES.get(1), service.leader());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testAddLearners() throws Exception {
+ String groupId = "test";
+
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addLearnersRequest()
+ .learnersList(addLearners)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.learnersOpResponse().newLearnersList(addLearners).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY).get(3, TimeUnit.SECONDS);
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ service.addLearners(NODES.subList(1, 3)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testResetLearners() throws Exception {
+ String groupId = "test";
+
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ List<String> resetLearners = peersToIds(NODES.subList(2, 3));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.resetLearnersRequest()
+ .learnersList(resetLearners)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.learnersOpResponse().newLearnersList(resetLearners).build()));
+
+ mockAddLearners(groupId, addLearners, addLearners);
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY).get(3, TimeUnit.SECONDS);
+
+ service.addLearners(NODES.subList(1, 3)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+
+ service.resetLearners(NODES.subList(2, 3)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(2, 3), service.learners());
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRemoveLearners() throws Exception {
+ String groupId = "test";
+
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ List<String> removeLearners = peersToIds(NODES.subList(2, 3));
+
+ List<String> resultLearners =
+ NODES.subList(1, 2).stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.removeLearnersRequest()
+ .learnersList(removeLearners)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
+
+ mockAddLearners(groupId, addLearners, addLearners);
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service =
+ RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY).get(3, TimeUnit.SECONDS);
+
+ service.addLearners(NODES.subList(1, 3)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+
+ service.removeLearners(NODES.subList(2, 3)).get();
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 2), service.learners());
+ }
+ /**
+ * @param delay {@code True} to create a delay before response.
+ * @param peer Fail the request targeted to given peer.
+ */
+ private void mockUserInput(boolean delay, @Nullable Peer peer) {
+ when(messagingService.invoke(
+ any(NetworkAddress.class),
+ argThat(new ArgumentMatcher<ActionRequest>() {
+ @Override public boolean matches(ActionRequest arg) {
+ return arg.command() instanceof TestCommand;
+ }
+ }),
+ anyLong()
+ )).then(invocation -> {
+ NetworkAddress target = invocation.getArgument(0);
+
+ if (peer != null && target.equals(peer.address()))
+ return failedFuture(new IgniteInternalException(new ConnectException()));
+
+ if (delay) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+
+ return FACTORY.actionResponse().result(new TestResponse()).build();
+ });
+ }
+
+ Object resp;
+
+ if (leader == null)
+ resp = FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
+ else if (!target.equals(leader.address()))
+ resp = FACTORY.errorResponse()
+ .errorCode(RaftError.EPERM.getNumber()).leaderId(PeerId.fromPeer(leader).toString()).build();
+ else
+ resp = FACTORY.actionResponse().result(new TestResponse()).build();
+
+ return completedFuture(resp);
+ });
+ }
+
+ /**
+ * @param delay {@code True} to delay response.
+ */
+ private void mockLeaderRequest(boolean delay) {
+ when(messagingService.invoke(any(NetworkAddress.class), any(CliRequests.GetLeaderRequest.class), anyLong()))
+ .then(invocation -> {
+ if (delay) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+
+ return FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
+ });
+ }
+
+ PeerId leader0 = PeerId.fromPeer(leader);
+
+ Object resp = leader0 == null ?
+ FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build() :
+ FACTORY.getLeaderResponse().leaderId(leader0.toString()).build();
+
+ return completedFuture(resp);
+ });
+ }
+
+ /**
+ * @param mode Mock mode.
+ */
+ private void mockSnapshotRequest(int mode) {
+ when(messagingService.invoke(any(NetworkAddress.class), any(CliRequests.SnapshotRequest.class), anyLong()))
+ .then(invocation -> {
+ if (mode == 0) {
+ return completedFuture(FACTORY.errorResponse().errorCode(RaftError.UNKNOWN.getNumber()).
+ errorMsg("Failed to create a snapshot").build());
+ }
+ else
+ return failedFuture(new IgniteInternalException("Very bad"));
+ });
+ }
+
+ /** */
+ private void mockAddLearners(String groupId, List<String> addLearners, List<String> resultLearners) {
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addLearnersRequest()
+ .learnersList(addLearners)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+ completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
+
+ }
+
+ /**
+ * Convert list of {@link Peer} to list of string representations.
+ *
+ * @param peers List of {@link Peer}
+ * @return List of string representations.
+ */
+ private List<String> peersToIds(List<Peer> peers) {
+ return peers.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+ }
+
+ /** */
+ private static class TestCommand implements WriteCommand {
+ }
+
+ /** */
+ private static class TestResponse {
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index c71390b..07a1fb5 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -38,7 +38,6 @@
super(
clusterService,
nodeManager,
- nodeOptions.getRaftClientMessagesFactory(),
nodeOptions.getRaftMessagesFactory(),
requestExecutor
);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessorTest.java
index 7b18a36..95cb3bc 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/BaseCliRequestProcessorTest.java
@@ -26,7 +26,6 @@
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.PingRequest;
import org.apache.ignite.raft.jraft.test.MockAsyncContext;
@@ -47,7 +46,7 @@
private static class MockCliRequestProcessor extends BaseCliRequestProcessor<PingRequest> {
private String peerId;
private String groupId;
- private RpcRequestClosure done;
+ private IgniteCliRpcRequestClosure done;
private CliRequestContext ctx;
MockCliRequestProcessor(String peerId, String groupId) {
@@ -67,7 +66,7 @@
}
@Override
- protected Message processRequest0(CliRequestContext ctx, PingRequest request, RpcRequestClosure done) {
+ protected Message processRequest0(CliRequestContext ctx, PingRequest request, IgniteCliRpcRequestClosure done) {
this.ctx = ctx;
this.done = done;
return RaftRpcFactory.DEFAULT.newResponse(msgFactory(), Status.OK());
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 9cad5f9..091bbf8 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -59,9 +59,9 @@
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -95,7 +95,7 @@
public static final int PARTS = 10;
/** Factory. */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Network factory. */
private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();