blob: b6809e6df45018cf49d75a8eb4794ef87086cdd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.raft.rpc;
import java.util.concurrent.ExecutionException;
import org.apache.hugegraph.backend.store.raft.RaftStoreClosure;
import org.apache.hugegraph.backend.store.raft.StoreCommand;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.util.Endpoint;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.raft.RaftClosure;
import org.apache.hugegraph.backend.store.raft.RaftContext;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreCommandRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreCommandResponse;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ZeroByteStringHelper;
public class RpcForwarder {
private static final Logger LOG = Log.logger(RpcForwarder.class);
private final PeerId nodeId;
private final RaftClientService rpcClient;
public RpcForwarder(Node node) {
this.nodeId = node.getNodeId().getPeerId();
this.rpcClient = ((NodeImpl) node).getRpcService();
E.checkNotNull(this.rpcClient, "rpc client");
}
public void forwardToLeader(PeerId leaderId, StoreCommand command,
RaftStoreClosure future) {
E.checkNotNull(leaderId, "leader id");
E.checkState(!leaderId.equals(this.nodeId),
"Invalid state: current node is the leader, there is " +
"no need to forward the request");
LOG.debug("The node {} forward request to leader {}",
this.nodeId, leaderId);
StoreCommandRequest.Builder builder = StoreCommandRequest.newBuilder();
builder.setType(command.type());
builder.setAction(command.action());
builder.setData(ZeroByteStringHelper.wrap(command.data()));
StoreCommandRequest request = builder.build();
RpcResponseClosure<StoreCommandResponse> responseClosure;
responseClosure = new RpcResponseClosure<StoreCommandResponse>() {
@Override
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
// This code forwards the request to the Raft leader and considers the operation successful
// if it's forwarded successfully. It returns a RaftClosure because the calling
// logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader,
// it executes the corresponding logic locally and notifies the calling logic asynchronously
// via RaftClosure. Therefore, the result is returned as a RaftClosure here.
RaftClosure<Status> supplierFuture = new RaftClosure<>();
supplierFuture.complete(Status.OK());
future.complete(Status.OK(), () -> supplierFuture);
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
"fowared request failed");
BackendException e = new BackendException(
"Current node isn't leader, leader " +
"is [%s], failed to forward request " +
"to leader: %s",
leaderId, response.getMessage());
future.failure(status, e);
}
}
@Override
public void run(Status status) {
future.run(status);
}
};
this.waitRpc(leaderId.getEndpoint(), request, responseClosure);
}
public <T extends Message> RaftClosure<T> forwardToLeader(PeerId leaderId,
Message request) {
E.checkNotNull(leaderId, "leader id");
E.checkState(!leaderId.equals(this.nodeId),
"Invalid state: current node is the leader, there is " +
"no need to forward the request");
LOG.debug("The node '{}' forward request to leader '{}'",
this.nodeId, leaderId);
RaftClosure<T> future = new RaftClosure<>();
RpcResponseClosure<T> responseDone = new RpcResponseClosure<T>() {
@Override
public void setResponse(T response) {
FieldDescriptor fd = response.getDescriptorForType()
.findFieldByName("common");
Object object = response.getField(fd);
E.checkState(object instanceof CommonResponse,
"The common field must be instance of " +
"CommonResponse, actual is '%s'",
object != null ? object.getClass() : null);
CommonResponse commonResponse = (CommonResponse) object;
if (commonResponse.getStatus()) {
future.complete(Status.OK(), () -> response);
} else {
Status status = new Status(RaftError.UNKNOWN,
"fowared request failed");
BackendException e = new BackendException(
"Current node isn't leader, leader " +
"is [%s], failed to forward request " +
"to leader: %s",
leaderId, commonResponse.getMessage());
future.failure(status, e);
}
}
@Override
public void run(Status status) {
future.run(status);
}
};
this.waitRpc(leaderId.getEndpoint(), request, responseDone);
return future;
}
private <T extends Message> void waitRpc(Endpoint endpoint, Message request,
RpcResponseClosure<T> done) {
E.checkNotNull(endpoint, "leader endpoint");
try {
this.rpcClient.invokeWithDone(endpoint, request, done,
RaftContext.WAIT_RPC_TIMEOUT)
.get();
} catch (InterruptedException e) {
throw new BackendException("Invoke rpc request was interrupted, " +
"please try again later", e);
} catch (ExecutionException e) {
throw new BackendException("Failed to invoke rpc request", e);
}
}
}