| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.protocolPB; |
| |
| import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.getRequest; |
| //import static org.apache.hadoop.ozone.protocol.proto |
| // .OzoneManagerProtocolProtos.Type.PrepareStatus; |
| |
| import java.io.IOException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; |
| import org.apache.hadoop.hdds.tracing.TracingUtil; |
| import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; |
| import org.apache.hadoop.ozone.OmUtils; |
| import org.apache.hadoop.ozone.om.OzoneManager; |
| import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; |
| import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; |
| import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; |
| import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; |
| import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; |
| import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus; |
| import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; |
| import org.apache.hadoop.ozone.om.request.OMClientRequest; |
| import org.apache.hadoop.ozone.om.response.OMClientResponse; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; |
| |
| import com.google.protobuf.ProtocolMessageEnum; |
| import com.google.protobuf.RpcController; |
| import com.google.protobuf.ServiceException; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.util.ExitUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY; |
| import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER; |
| |
| /** |
| * This class is the server-side translator that forwards requests received on |
| * {@link OzoneManagerProtocolPB} |
| * to the OzoneManagerService server implementation. |
| */ |
| public class OzoneManagerProtocolServerSideTranslatorPB implements |
| OzoneManagerProtocolPB { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); |
| private final OzoneManagerRatisServer omRatisServer; |
| private final RequestHandler handler; |
| private final boolean isRatisEnabled; |
| private final OzoneManager ozoneManager; |
| private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; |
| private final AtomicLong transactionIndex; |
| private final OzoneProtocolMessageDispatcher<OMRequest, OMResponse, |
| ProtocolMessageEnum> dispatcher; |
| |
| /** |
| * Constructs an instance of the server handler. |
| * |
| * @param impl OzoneManagerProtocolPB |
| */ |
| public OzoneManagerProtocolServerSideTranslatorPB( |
| OzoneManager impl, |
| OzoneManagerRatisServer ratisServer, |
| ProtocolMessageMetrics<ProtocolMessageEnum> metrics, |
| boolean enableRatis, |
| long lastTransactionIndexForNonRatis) { |
| this.ozoneManager = impl; |
| this.isRatisEnabled = enableRatis; |
| // Update the transactionIndex with the last TransactionIndex read from DB. |
| // New requests should have transactionIndex incremented from this index |
| // onwards to ensure unique objectIDs. |
| this.transactionIndex = new AtomicLong(lastTransactionIndexForNonRatis); |
| |
| if (isRatisEnabled) { |
| // In case of ratis is enabled, handler in ServerSideTransaltorPB is used |
| // only for read requests and read requests does not require |
| // double-buffer to be initialized. |
| this.ozoneManagerDoubleBuffer = null; |
| handler = new OzoneManagerRequestHandler(impl, null); |
| } else { |
| this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder() |
| .setOmMetadataManager(ozoneManager.getMetadataManager()) |
| // Do nothing. |
| // For OM NON-HA code, there is no need to save transaction index. |
| // As we wait until the double buffer flushes DB to disk. |
| .setOzoneManagerRatisSnapShot((i) -> { |
| }) |
| .enableRatis(isRatisEnabled) |
| .enableTracing(TracingUtil.isTracingEnabled( |
| ozoneManager.getConfiguration())) |
| .build(); |
| handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer); |
| } |
| this.omRatisServer = ratisServer; |
| dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol", |
| metrics, LOG); |
| } |
| |
| /** |
| * Submit requests to Ratis server for OM HA implementation. |
| * TODO: Once HA is implemented fully, we should have only one server side |
| * translator for OM protocol. |
| */ |
| @Override |
| public OMResponse submitRequest(RpcController controller, |
| OMRequest request) throws ServiceException { |
| |
| return dispatcher.processRequest(request, this::processRequest, |
| request.getCmdType(), request.getTraceID()); |
| } |
| |
| private OMResponse processRequest(OMRequest request) throws |
| ServiceException { |
| RaftServerStatus raftServerStatus; |
| if (isRatisEnabled) { |
| // Check if the request is a read only request |
| if (OmUtils.isReadOnly(request)) { |
| return submitReadRequestToOM(request); |
| } else { |
| raftServerStatus = omRatisServer.checkLeaderStatus(); |
| if (raftServerStatus == LEADER_AND_READY) { |
| try { |
| OMClientRequest omClientRequest = getRequest(ozoneManager, request); |
| request = omClientRequest.preExecute(ozoneManager); |
| } catch (IOException ex) { |
| // As some of the preExecute returns error. So handle here. |
| return createErrorResponse(request, ex); |
| } |
| return submitRequestToRatis(request); |
| } else { |
| throw createLeaderErrorException(raftServerStatus); |
| } |
| } |
| } else { |
| return submitRequestDirectlyToOM(request); |
| } |
| } |
| |
| /** |
| * Create OMResponse from the specified OMRequest and exception. |
| * |
| * @param omRequest |
| * @param exception |
| * @return OMResponse |
| */ |
| private OMResponse createErrorResponse( |
| OMRequest omRequest, IOException exception) { |
| // Added all write command types here, because in future if any of the |
| // preExecute is changed to return IOException, we can return the error |
| // OMResponse to the client. |
| OMResponse.Builder omResponse = OMResponse.newBuilder() |
| .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) |
| .setCmdType(omRequest.getCmdType()) |
| .setTraceID(omRequest.getTraceID()) |
| .setSuccess(false); |
| if (exception.getMessage() != null) { |
| omResponse.setMessage(exception.getMessage()); |
| } |
| return omResponse.build(); |
| } |
| |
| /** |
| * Submits request to OM's Ratis server. |
| */ |
| private OMResponse submitRequestToRatis(OMRequest request) |
| throws ServiceException { |
| return omRatisServer.submitRequest(request); |
| } |
| |
| private OMResponse submitReadRequestToOM(OMRequest request) |
| throws ServiceException { |
| // Check if this OM is the leader. |
| RaftServerStatus raftServerStatus = omRatisServer.checkLeaderStatus(); |
| if (raftServerStatus == LEADER_AND_READY) { |
| return handler.handleReadRequest(request); |
| } else { |
| throw createLeaderErrorException(raftServerStatus); |
| } |
| } |
| |
| private ServiceException createNotLeaderException() { |
| RaftPeerId raftPeerId = omRatisServer.getRaftPeerId(); |
| |
| // TODO: Set suggest leaderID. Right now, client is not using suggest |
| // leaderID. Need to fix this. |
| |
| OMNotLeaderException notLeaderException = |
| new OMNotLeaderException(raftPeerId); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(notLeaderException.getMessage()); |
| } |
| |
| return new ServiceException(notLeaderException); |
| } |
| |
| private ServiceException createLeaderErrorException( |
| RaftServerStatus raftServerStatus) { |
| if (raftServerStatus == NOT_LEADER) { |
| return createNotLeaderException(); |
| } else { |
| return createLeaderNotReadyException(); |
| } |
| } |
| |
| |
| private ServiceException createLeaderNotReadyException() { |
| RaftPeerId raftPeerId = omRatisServer.getRaftPeerId(); |
| |
| OMLeaderNotReadyException leaderNotReadyException = |
| new OMLeaderNotReadyException(raftPeerId.toString() + " is Leader " + |
| "but not ready to process request"); |
| |
| return new ServiceException(leaderNotReadyException); |
| } |
| |
| /** |
| * Submits request directly to OM. |
| */ |
| private OMResponse submitRequestDirectlyToOM(OMRequest request) { |
| OMClientResponse omClientResponse = null; |
| long index = 0L; |
| try { |
| if (OmUtils.isReadOnly(request)) { |
| return handler.handleReadRequest(request); |
| } else { |
| OMClientRequest omClientRequest = getRequest(ozoneManager, request); |
| request = omClientRequest.preExecute(ozoneManager); |
| index = transactionIndex.incrementAndGet(); |
| omClientResponse = handler.handleWriteRequest(request, index); |
| } |
| } catch(IOException ex) { |
| // As some of the preExecute returns error. So handle here. |
| return createErrorResponse(request, ex); |
| } |
| try { |
| omClientResponse.getFlushFuture().get(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Future for {} is completed", request); |
| } |
| } catch (ExecutionException | InterruptedException ex) { |
| // terminate OM. As if we are in this stage means, while getting |
| // response from flush future, we got an exception. |
| String errorMessage = "Got error during waiting for flush to be " + |
| "completed for " + "request" + request.toString(); |
| ExitUtils.terminate(1, errorMessage, ex, LOG); |
| } |
| return omClientResponse.getOMResponse(); |
| } |
| |
| public void stop() { |
| if (!isRatisEnabled) { |
| ozoneManagerDoubleBuffer.stop(); |
| } |
| } |
| } |