blob: f62904bf829b8ed27459391b90a6971038c6298a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.client.service.impl;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.exception.RaftException;
import org.apache.ignite.raft.client.message.ActionRequest;
import org.apache.ignite.raft.client.message.ActionResponse;
import org.apache.ignite.raft.client.message.AddLearnersRequest;
import org.apache.ignite.raft.client.message.AddPeersRequest;
import org.apache.ignite.raft.client.message.ChangePeersResponse;
import org.apache.ignite.raft.client.message.GetLeaderRequest;
import org.apache.ignite.raft.client.message.GetLeaderResponse;
import org.apache.ignite.raft.client.message.GetPeersRequest;
import org.apache.ignite.raft.client.message.GetPeersResponse;
import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
import org.apache.ignite.raft.client.message.RemovePeersRequest;
import org.apache.ignite.raft.client.message.SnapshotRequest;
import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.ThreadLocalRandom.current;
/**
* The implementation of {@link RaftGroupService}
*/
public class RaftGroupServiceImpl implements RaftGroupService {
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceImpl.class);
/** */
private volatile int timeout;
/** */
private final String groupId;
/** */
private final RaftClientMessageFactory factory;
/** */
private volatile Peer leader;
/** */
private volatile List<Peer> peers;
/** */
private volatile List<Peer> learners;
/** */
private final ClusterService cluster;
/** */
private final long retryDelay;
/** */
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
/**
* @param groupId Group id.
* @param cluster A cluster.
* @param factory A message factory.
* @param timeout Request timeout.
* @param peers Initial group configuration.
* @param refreshLeader {@code True} to synchronously refresh leader on service creation.
* @param retryDelay Retry delay.
*/
public RaftGroupServiceImpl(
String groupId,
ClusterService cluster,
RaftClientMessageFactory factory,
int timeout,
List<Peer> peers,
boolean refreshLeader,
long retryDelay
) {
this.cluster = requireNonNull(cluster);
this.peers = requireNonNull(peers);
this.factory = factory;
this.timeout = timeout;
this.groupId = groupId;
this.retryDelay = retryDelay;
if (refreshLeader) {
try {
refreshLeader().get();
}
catch (Exception e) {
LOG.error("Failed to refresh a leader", e);
}
}
}
/** {@inheritDoc} */
@Override public @NotNull String groupId() {
return groupId;
}
/** {@inheritDoc} */
@Override public long timeout() {
return timeout;
}
/** {@inheritDoc} */
@Override public void timeout(long newTimeout) {
this.timeout = timeout;
}
/** {@inheritDoc} */
@Override public Peer leader() {
return leader;
}
/** {@inheritDoc} */
@Override public List<Peer> peers() {
return peers;
}
/** {@inheritDoc} */
@Override public List<Peer> learners() {
return learners;
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> refreshLeader() {
GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
CompletableFuture<GetLeaderResponse> fut = sendWithRetry(randomNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
leader = resp.leader();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
GetPeersRequest req = factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> refreshMembers(onlyAlive));
CompletableFuture<GetPeersResponse> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
peers = resp.peers();
learners = resp.learners();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> addPeers(List<Peer> peers) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> addPeers(peers));
AddPeersRequest req = factory.addPeersRequest().groupId(groupId).peers(peers).build();
CompletableFuture<ChangePeersResponse> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
this.peers = resp.newPeers();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> removePeers(List<Peer> peers) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> removePeers(peers));
RemovePeersRequest req = factory.removePeerRequest().groupId(groupId).peers(peers).build();
CompletableFuture<ChangePeersResponse> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
this.peers = resp.newPeers();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> addLearners(List<Peer> learners) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> addLearners(learners));
AddLearnersRequest req = factory.addLearnersRequest().groupId(groupId).learners(learners).build();
CompletableFuture<ChangePeersResponse> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
this.learners = resp.newPeers();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> removeLearners(List<Peer> learners) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> removeLearners(learners));
RemoveLearnersRequest req = factory.removeLearnersRequest().groupId(groupId).learners(learners).build();
CompletableFuture<ChangePeersResponse> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> {
this.learners = resp.newPeers();
return null;
});
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> snapshot(Peer peer) {
SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> transferLeadership(Peer newLeader) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> transferLeadership(newLeader));
TransferLeadershipRequest req = factory.transferLeaderRequest().groupId(groupId).peer(newLeader).build();
CompletableFuture<?> fut = cluster.messagingService().invoke(newLeader.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
/** {@inheritDoc} */
@Override public <R> CompletableFuture<R> run(Command cmd) {
Peer leader = this.leader;
if (leader == null)
return refreshLeader().thenCompose(res -> run(cmd));
ActionRequest<R> req = factory.actionRequest().command(cmd).groupId(groupId).build();
CompletableFuture<ActionResponse<R>> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
return fut.thenApply(resp -> resp.result());
}
/** {@inheritDoc} */
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
}
private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
if (currentTimeMillis() >= stopTime)
return CompletableFuture.failedFuture(new TimeoutException());
return cluster.messagingService().invoke(node, req, timeout)
.thenCompose(resp -> {
if (resp instanceof RaftErrorResponse) {
RaftErrorResponse resp0 = (RaftErrorResponse)resp;
switch (resp0.errorCode()) {
case NO_LEADER:
return composeWithDelay(() -> sendWithRetry(randomNode(), req, stopTime));
case LEADER_CHANGED:
leader = resp0.newLeader();
return composeWithDelay(() -> sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
case SUCCESS:
return CompletableFuture.completedFuture(null);
default:
return CompletableFuture.failedFuture(new RaftException(resp0.errorCode()));
}
} else
return CompletableFuture.completedFuture((R) resp);
});
}
private <T> CompletableFuture<T> composeWithDelay(Supplier<CompletableFuture<T>> supplier) {
var result = new CompletableFuture<T>();
executor.schedule(() -> {
supplier.get().whenComplete((res, err) -> {
if (err == null)
result.complete(res);
else
result.completeExceptionally(err);
});
}, retryDelay, TimeUnit.MILLISECONDS);
return result;
}
/**
* @return Random node.
*/
private ClusterNode randomNode() {
List<Peer> peers0 = peers;
if (peers0 == null || peers0.isEmpty())
return null;
return peers0.get(current().nextInt(peers0.size())).getNode();
}
}