blob: ff2c966983f48c251103b66d161357302eb93b21 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB}
* to the OzoneManagerService server implementation.
*/
public class OzoneManagerProtocolServerSideTranslatorPB implements
OzoneManagerProtocolPB {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private final OzoneManagerRatisServer omRatisServer;
private final RequestHandler handler;
private final boolean isRatisEnabled;
private final OzoneManager ozoneManager;
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final AtomicLong transactionIndex = new AtomicLong(0L);
private final OzoneProtocolMessageDispatcher<OMRequest, OMResponse>
dispatcher;
/**
* Constructs an instance of the server handler.
*
* @param impl OzoneManagerProtocolPB
*/
public OzoneManagerProtocolServerSideTranslatorPB(
OzoneManager impl,
OzoneManagerRatisServer ratisServer,
ProtocolMessageMetrics metrics,
boolean enableRatis) {
this.ozoneManager = impl;
handler = new OzoneManagerRequestHandler(impl);
this.omRatisServer = ratisServer;
this.isRatisEnabled = enableRatis;
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), (i) -> {
// Do nothing.
// For OM NON-HA code, there is no need to save transaction index.
// As we wait until the double buffer flushes DB to disk.
}, isRatisEnabled);
dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
metrics, LOG);
}
/**
* Submit requests to Ratis server for OM HA implementation.
* TODO: Once HA is implemented fully, we should have only one server side
* translator for OM protocol.
*/
@Override
public OMResponse submitRequest(RpcController controller,
OMRequest request) throws ServiceException {
return dispatcher.processRequest(request, this::processRequest,
request.getCmdType(), request.getTraceID());
}
private OMResponse processRequest(OMRequest request) throws
ServiceException {
if (isRatisEnabled) {
// Check if the request is a read only request
if (OmUtils.isReadOnly(request)) {
return submitReadRequestToOM(request);
} else {
if (omRatisServer.isLeader()) {
try {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(request);
Preconditions.checkState(omClientRequest != null,
"Unrecognized write command type request" + request.toString());
request = omClientRequest.preExecute(ozoneManager);
} catch (IOException ex) {
// As some of the preExecute returns error. So handle here.
return createErrorResponse(request, ex);
}
return submitRequestToRatis(request);
} else {
// throw not leader exception. This is being done, so to avoid
// unnecessary execution of preExecute on follower OM's. This
// will be helpful in the case like where we we reduce the
// chance of allocate blocks on follower OM's. Right now our
// leader status is updated every 1 second.
throw createNotLeaderException();
}
}
} else {
return submitRequestDirectlyToOM(request);
}
}
/**
* Create OMResponse from the specified OMRequest and exception.
*
* @param omRequest
* @param exception
* @return OMResponse
*/
private OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
// Added all write command types here, because in future if any of the
// preExecute is changed to return IOException, we can return the error
// OMResponse to the client.
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setStatus(
OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(cmdType)
.setSuccess(false);
if (exception.getMessage() != null) {
omResponse.setMessage(exception.getMessage());
}
return omResponse.build();
}
/**
* Submits request to OM's Ratis server.
*/
private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
//TODO: Need to remove OzoneManagerRatisClient, as now we are using
// RatisServer Api's.
return omRatisServer.submitRequest(request);
}
private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
if (omRatisServer.isLeader()) {
return handler.handle(request);
} else {
throw createNotLeaderException();
}
}
private ServiceException createNotLeaderException() {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
Optional<RaftPeerId> leaderRaftPeerId = omRatisServer
.getCachedLeaderPeerId();
NotLeaderException notLeaderException;
if (leaderRaftPeerId.isPresent()) {
notLeaderException = new NotLeaderException(raftPeerId.toString());
} else {
notLeaderException = new NotLeaderException(
raftPeerId.toString(), leaderRaftPeerId.toString());
}
if (LOG.isDebugEnabled()) {
LOG.debug(notLeaderException.getMessage());
}
return new ServiceException(notLeaderException);
}
/**
* Submits request directly to OM.
*/
private OMResponse submitRequestDirectlyToOM(OMRequest request) {
OMClientResponse omClientResponse = null;
long index = 0L;
try {
if (OmUtils.isReadOnly(request)) {
return handler.handle(request);
} else {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(request);
Preconditions.checkState(omClientRequest != null,
"Unrecognized write command type request" + request.toString());
request = omClientRequest.preExecute(ozoneManager);
index = transactionIndex.incrementAndGet();
omClientRequest = OzoneManagerRatisUtils.createClientRequest(request);
omClientResponse = omClientRequest.validateAndUpdateCache(
ozoneManager, index, ozoneManagerDoubleBuffer::add);
}
} catch(IOException ex) {
// As some of the preExecute returns error. So handle here.
return createErrorResponse(request, ex);
}
try {
omClientResponse.getFlushFuture().get();
if (LOG.isTraceEnabled()) {
LOG.trace("Future for {} is completed", request);
}
} catch (ExecutionException | InterruptedException ex) {
// terminate OM. As if we are in this stage means, while getting
// response from flush future, we got an exception.
String errorMessage = "Got error during waiting for flush to be " +
"completed for " + "request" + request.toString();
ExitUtils.terminate(1, errorMessage, ex, LOG);
}
return omClientResponse.getOMResponse();
}
public void stop() {
if (!isRatisEnabled) {
ozoneManagerDoubleBuffer.stop();
}
}
}