blob: e302956d3962c5a1c22b4b8efd2123e5df6a0442 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The OM StateMachine is the state machine for OM Ratis server. It is
* responsible for applying ratis committed transactions to
* {@link OzoneManager}.
*/
public class OzoneManagerStateMachine extends BaseStateMachine {
static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManager ozoneManager;
private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex;
private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final OMRatisSnapshotInfo snapshotInfo;
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
this.snapshotInfo = ozoneManager.getSnapshotInfo();
updateLastAppliedIndexWithSnaphsotIndex();
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
}
/**
* Initializes the State Machine with the given server, group and storage.
*/
@Override
public void initialize(RaftServer server, RaftGroupId id,
RaftStorage raftStorage) throws IOException {
lifeCycle.startAndTransition(() -> {
super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage);
});
}
@Override
public SnapshotInfo getLatestSnapshot() {
return snapshotInfo;
}
/**
* Called to notify state machine about indexes which are processed
* internally by Raft Server, this currently happens when conf entries are
* processed in raft Server. This keep state machine to keep a track of index
* updates.
* @param term term of the current log entry
* @param index index which is being updated
*/
@Override
public void notifyIndexUpdate(long term, long index) {
// SnapshotInfo should be updated when the term changes.
// The index here refers to the log entry index and the index in
// SnapshotInfo represents the snapshotIndex i.e. the index of the last
// transaction included in the snapshot. Hence, snaphsotInfo#index is not
// updated here.
snapshotInfo.updateTerm(term);
}
/**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
* should be rejected.
* @throws IOException thrown by the state machine while validating
*/
@Override
public TransactionContext startTransaction(
RaftClientRequest raftClientRequest) throws IOException {
ByteString messageContent = raftClientRequest.getMessage().getContent();
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
messageContent);
Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(
raftGroupId));
try {
handler.validateRequest(omRequest);
} catch (IOException ioe) {
TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.build();
ctxt.setException(ioe);
return ctxt;
}
return handleStartTransactionRequests(raftClientRequest, omRequest);
}
/*
* Apply a committed log entry to the state machine.
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
long trxLogIndex = trx.getLogEntry().getIndex();
// In the current approach we have one single global thread executor.
// with single thread. Right now this is being done for correctness, as
// applyTransaction will be run on multiple OM's we want to execute the
// transactions in the same order on all OM's, otherwise there is a
// chance that OM replica's can be out of sync.
// TODO: In this way we are making all applyTransactions in
// OM serial order. Revisit this in future to use multiple executors for
// volume/bucket.
// Reason for not immediately implementing executor per volume is, if
// one executor operations are slow, we cannot update the
// lastAppliedIndex in OzoneManager StateMachine, even if other
// executor has completed the transactions with id more.
// We have 300 transactions, And for each volume we have transactions
// of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
// 299.
// Example: Executor1 - Volume1 - 100 (current completed transaction)
// Example: Executor2 - Volume2 - 299 (current completed transaction)
// Now we have applied transactions of 0 - 100 and 149 - 299. We
// cannot update lastAppliedIndex to 299. We need to update it to 100,
// since 101 - 149 are not applied. When OM restarts it will
// applyTransactions from lastAppliedIndex.
// We can update the lastAppliedIndex to 100, and update it to 299,
// only after completing 101 - 149. In initial stage, we are starting
// with single global executor. Will revisit this when needed.
CompletableFuture<Message> future = CompletableFuture.supplyAsync(
() -> runCommand(request, trxLogIndex), executorService);
return future;
} catch (IOException e) {
return completeExceptionally(e);
}
}
/**
* Query the state machine. The request must be read-only.
*/
@Override
public CompletableFuture<Message> query(Message request) {
try {
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
request.getContent());
return CompletableFuture.completedFuture(queryCommand(omRequest));
} catch (IOException e) {
return completeExceptionally(e);
}
}
@Override
public void pause() {
lifeCycle.transition(LifeCycle.State.PAUSING);
lifeCycle.transition(LifeCycle.State.PAUSED);
ozoneManagerDoubleBuffer.stop();
}
/**
* Unpause the StateMachine, re-initialize the DoubleBuffer and update the
* lastAppliedIndex. This should be done after uploading new state to the
* StateMachine.
*/
public void unpause(long newLastAppliedSnaphsotIndex) {
lifeCycle.startAndTransition(() -> {
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
this::updateLastAppliedIndex);
this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex);
});
}
/**
* Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
* is the log index corresponding to the last applied transaction on the OM
* State Machine.
*
* @return the last applied index on the state machine which has been
* stored in the snapshot file.
*/
@Override
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
return ozoneManager.saveRatisSnapshot();
}
return 0;
}
/**
* Leader OM has purged entries from its log. To catch up, OM must download
* the latest checkpoint from the leader OM and install it.
* @param roleInfoProto the leader node information
* @param firstTermIndexInLog TermIndex of the first append entry available
* in the Leader's log.
* @return the last term index included in the installed snapshot.
*/
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId())
.toString();
LOG.info("Received install snapshot notificaiton form OM leader: {} with " +
"term index: {}", leaderNodeId, firstTermIndexInLog);
if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) {
// A non-leader Ratis server should not send this notification.
LOG.error("Received Install Snapshot notification from non-leader OM " +
"node: {}. Ignoring the notification.", leaderNodeId);
return completeExceptionally(new OMException("Received notification to " +
"install snaphost from non-leader OM node",
OMException.ResultCodes.RATIS_ERROR));
}
CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
() -> ozoneManager.installSnapshot(leaderNodeId),
installSnapshotExecutor);
return future;
}
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
omRatisServer.updateServerRole();
}
/**
* Handle the RaftClientRequest and return TransactionContext object.
* @param raftClientRequest
* @param omRequest
* @return TransactionContext
*/
private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}
/**
* Submits write request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
private Message runCommand(OMRequest request, long trxLogIndex) {
OMResponse response = handler.handleApplyTransaction(request, trxLogIndex);
lastAppliedIndex = trxLogIndex;
return OMRatisHelper.convertResponseToMessage(response);
}
@SuppressWarnings("HiddenField")
public void updateLastAppliedIndex(long lastAppliedIndex) {
this.lastAppliedIndex = lastAppliedIndex;
}
public void updateLastAppliedIndexWithSnaphsotIndex() {
this.lastAppliedIndex = snapshotInfo.getIndex();
}
/**
* Submits read request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
private Message queryCommand(OMRequest request) {
OMResponse response = handler.handle(request);
return OMRatisHelper.convertResponseToMessage(response);
}
public long getLastAppliedIndex() {
return lastAppliedIndex;
}
private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
@VisibleForTesting
public void setHandler(OzoneManagerHARequestHandler handler) {
this.handler = handler;
}
@VisibleForTesting
public void setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
}
public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
}
}