blob: 76f6b3cd2f183e3144b6e391b8ae6a83cfadccd6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMarker;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.container.common.helpers
.ContainerCommandRequestPBHelper;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import io.opentracing.Scope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
* to the right handler function.
*/
public class HddsDispatcher implements ContainerDispatcher, Auditor {
static final Logger LOG = LoggerFactory.getLogger(HddsDispatcher.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.DNLOGGER);
private final Map<ContainerType, Handler> handlers;
private final Configuration conf;
private final ContainerSet containerSet;
private final VolumeSet volumeSet;
private final StateContext context;
private final float containerCloseThreshold;
private String scmID;
private ContainerMetrics metrics;
/**
* Constructs an OzoneContainer that receives calls from
* XceiverServerHandler.
*/
public HddsDispatcher(Configuration config, ContainerSet contSet,
VolumeSet volumes, Map<ContainerType, Handler> handlers,
StateContext context, ContainerMetrics metrics) {
this.conf = config;
this.containerSet = contSet;
this.volumeSet = volumes;
this.context = context;
this.handlers = handlers;
this.metrics = metrics;
this.containerCloseThreshold = conf.getFloat(
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
}
@Override
public void init() {
}
@Override
public void shutdown() {
}
/**
* Returns true for exceptions which can be ignored for marking the container
* unhealthy.
* @param result ContainerCommandResponse error code.
* @return true if exception can be ignored, false otherwise.
*/
private boolean canIgnoreException(Result result) {
switch (result) {
case SUCCESS:
case CONTAINER_UNHEALTHY:
case CLOSED_CONTAINER_IO:
case DELETE_ON_OPEN_CONTAINER:
return true;
default:
return false;
}
}
@Override
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
containerSet
.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
@Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
String spanName = "HddsDispatcher." + msg.getCmdType().name();
try (Scope scope = TracingUtil
.importAndCreateScope(spanName, msg.getTraceID())) {
return dispatchRequest(msg, dispatcherContext);
}
}
@SuppressWarnings("methodlength")
private ContainerCommandResponseProto dispatchRequest(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Preconditions.checkNotNull(msg);
if (LOG.isTraceEnabled()) {
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
msg.getTraceID());
}
AuditAction action = ContainerCommandRequestPBHelper.getAuditAction(
msg.getCmdType());
EventType eventType = getEventType(msg);
Map<String, String> params =
ContainerCommandRequestPBHelper.getAuditParams(msg);
Container container;
ContainerType containerType;
ContainerCommandResponseProto responseProto = null;
long startTime = System.nanoTime();
ContainerProtos.Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
container = getContainer(containerID);
boolean isWriteStage =
(cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.WRITE_DATA);
boolean isWriteCommitStage =
(cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);
// if the command gets executed other than Ratis, the default wroite stage
// is WriteChunkStage.COMBINED
boolean isCombinedStage =
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
Map<Long, Long> container2BCSIDMap = null;
if (dispatcherContext != null) {
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
}
if (isWriteCommitStage) {
// check if the container Id exist in the loaded snapshot file. if
// it does not , it infers that , this is a restart of dn where
// the we are reapplying the transaction which was not captured in the
// snapshot.
// just add it to the list, and remove it from missing container set
// as it might have been added in the list during "init".
Preconditions.checkNotNull(container2BCSIDMap);
if (container2BCSIDMap.get(containerID) == null) {
container2BCSIDMap
.put(containerID, container.getBlockCommitSequenceId());
containerSet.getMissingContainerSet().remove(containerID);
}
}
if (getMissingContainerSet().contains(containerID)) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID
+ " has been lost and and cannot be recreated on this DataNode",
ContainerProtos.Result.CONTAINER_MISSING);
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
if (cmdType != ContainerProtos.Type.CreateContainer) {
/**
* Create Container should happen only as part of Write_Data phase of
* writeChunk.
*/
if (container == null && ((isWriteStage || isCombinedStage)
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
responseProto = createContainer(msg);
if (responseProto.getResult() != Result.SUCCESS) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " creation failed",
responseProto.getResult());
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null);
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
// if container not found return error
if (container == null) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return ContainerUtils.malformedRequest(msg);
}
containerType = msg.getCreateContainer().getContainerType();
}
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
if (!HddsUtils.isReadOnly(msg)) {
sendCloseContainerActionIfNeeded(container);
}
Handler handler = getHandler(containerType);
if (handler == null) {
StorageContainerException ex = new StorageContainerException("Invalid " +
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
responseProto = handler.handle(msg, container, dispatcherContext);
if (responseProto != null) {
metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime);
// If the request is of Write Type and the container operation
// is unsuccessful, it implies the applyTransaction on the container
// failed. All subsequent transactions on the container should fail and
// hence replica will be marked unhealthy here. In this case, a close
// container action will be sent to SCM to close the container.
// ApplyTransaction called on closed Container will fail with Closed
// container exception. In such cases, ignore the exception here
// If the container is already marked unhealthy, no need to change the
// state here.
Result result = responseProto.getResult();
if (cmdType == ContainerProtos.Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
// If the container is open/closing and the container operation
// has failed, it should be first marked unhealthy and the initiate the
// close container action. This also implies this is the first
// transaction which has failed, so the container is marked unhealthy
// right here.
// Once container is marked unhealthy, all the subsequent write
// transactions will fail with UNHEALTHY_CONTAINER exception.
// For container to be moved to unhealthy state here, the container can
// only be in open or closing state.
State containerState = container.getContainerData().getState();
Preconditions.checkState(
containerState == State.OPEN || containerState == State.CLOSING);
// mark and persist the container state to be unhealthy
try {
handler.markContainerUnhealthy(container);
} catch (IOException ioe) {
// just log the error here in case marking the container fails,
// Return the actual failure response to the client
LOG.error("Failed to mark container " + containerID + " UNHEALTHY. ",
ioe);
}
// in any case, the in memory state of the container should be unhealthy
Preconditions.checkArgument(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
return responseProto;
} else {
// log failure
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return ContainerUtils.unsupportedRequest(msg);
}
}
private void updateBCSID(Container container,
DispatcherContext dispatcherContext, ContainerProtos.Type cmdType) {
if (dispatcherContext != null && (cmdType == ContainerProtos.Type.PutBlock
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
Preconditions.checkNotNull(container);
long bcsID = container.getBlockCommitSequenceId();
long containerId = container.getContainerData().getContainerID();
Map<Long, Long> container2BCSIDMap;
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
Preconditions.checkNotNull(container2BCSIDMap);
Preconditions.checkArgument(container2BCSIDMap.containsKey(containerId));
// updates the latest BCSID on every putBlock or putSmallFile
// transaction over Ratis.
container2BCSIDMap.computeIfPresent(containerId, (u, v) -> v = bcsID);
}
}
/**
* Create a container using the input container request.
* @param containerRequest - the container request which requires container
* to be created.
* @return ContainerCommandResponseProto container command response.
*/
@VisibleForTesting
ContainerCommandResponseProto createContainer(
ContainerCommandRequestProto containerRequest) {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto.newBuilder();
ContainerType containerType =
ContainerProtos.ContainerType.KeyValueContainer;
createRequest.setContainerType(containerType);
ContainerCommandRequestProto.Builder requestBuilder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CreateContainer)
.setContainerID(containerRequest.getContainerID())
.setCreateContainer(createRequest.build())
.setPipelineID(containerRequest.getPipelineID())
.setDatanodeUuid(containerRequest.getDatanodeUuid())
.setTraceID(containerRequest.getTraceID());
// TODO: Assuming the container type to be KeyValueContainer for now.
// We need to get container type from the containerRequest.
Handler handler = getHandler(containerType);
return handler.handle(requestBuilder.build(), null, null);
}
/**
* This will be called as a part of creating the log entry during
* startTransaction in Ratis on the leader node. In such cases, if the
* container is not in open state for writing we should just fail.
* Leader will propagate the exception to client.
* @param msg container command proto
* @throws StorageContainerException In case container state is open for write
* requests and in invalid state for read requests.
*/
@Override
public void validateContainerCommand(
ContainerCommandRequestProto msg) throws StorageContainerException {
long containerID = msg.getContainerID();
Container container = getContainer(containerID);
if (container == null) {
return;
}
ContainerType containerType = container.getContainerType();
ContainerProtos.Type cmdType = msg.getCmdType();
AuditAction action =
ContainerCommandRequestPBHelper.getAuditAction(cmdType);
EventType eventType = getEventType(msg);
Map<String, String> params =
ContainerCommandRequestPBHelper.getAuditParams(msg);
Handler handler = getHandler(containerType);
if (handler == null) {
StorageContainerException ex = new StorageContainerException(
"Invalid " + "ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
throw ex;
}
State containerState = container.getContainerState();
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
switch (cmdType) {
case CreateContainer:
// Create Container is idempotent. There is nothing to validate.
break;
case CloseContainer:
// If the container is unhealthy, closeContainer will be rejected
// while execution. Nothing to validate here.
break;
default:
// if the container is not open, no updates can happen. Just throw
// an exception
ContainerNotOpenException cex = new ContainerNotOpenException(
"Container " + containerID + " in " + containerState + " state");
audit(action, eventType, params, AuditEventStatus.FAILURE, cex);
throw cex;
}
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
InvalidContainerStateException iex = new InvalidContainerStateException(
"Container " + containerID + " in " + containerState + " state");
audit(action, eventType, params, AuditEventStatus.FAILURE, iex);
throw iex;
}
}
/**
* If the container usage reaches the close threshold or the container is
* marked unhealthy we send Close ContainerAction to SCM.
* @param container current state of container
*/
private void sendCloseContainerActionIfNeeded(Container container) {
// We have to find a more efficient way to close a container.
boolean isSpaceFull = isContainerFull(container);
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
ContainerAction.Reason reason =
isSpaceFull ? ContainerAction.Reason.CONTAINER_FULL :
ContainerAction.Reason.CONTAINER_UNHEALTHY;
ContainerAction action = ContainerAction.newBuilder()
.setContainerID(containerData.getContainerID())
.setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
context.addContainerActionIfAbsent(action);
}
}
private boolean isContainerFull(Container container) {
boolean isOpen = Optional.ofNullable(container)
.map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN)
.orElse(Boolean.FALSE);
if (isOpen) {
ContainerData containerData = container.getContainerData();
double containerUsedPercentage =
1.0f * containerData.getBytesUsed() / containerData.getMaxSize();
return containerUsedPercentage >= containerCloseThreshold;
} else {
return false;
}
}
private boolean isContainerUnhealthy(Container container) {
return Optional.ofNullable(container).map(
cont -> (cont.getContainerState() ==
ContainerDataProto.State.UNHEALTHY))
.orElse(Boolean.FALSE);
}
@Override
public Handler getHandler(ContainerProtos.ContainerType containerType) {
return handlers.get(containerType);
}
@Override
public void setScmId(String scmId) {
Preconditions.checkNotNull(scmId, "scmId Cannot be null");
if (this.scmID == null) {
this.scmID = scmId;
for (Map.Entry<ContainerType, Handler> handlerMap : handlers.entrySet()) {
handlerMap.getValue().setScmID(scmID);
}
}
}
@VisibleForTesting
public Container getContainer(long containerID) {
return containerSet.getContainer(containerID);
}
@VisibleForTesting
public Set<Long> getMissingContainerSet() {
return containerSet.getMissingContainerSet();
}
private ContainerType getContainerType(Container container) {
return container.getContainerType();
}
@VisibleForTesting
public void setMetricsForTesting(ContainerMetrics containerMetrics) {
this.metrics = containerMetrics;
}
private EventType getEventType(ContainerCommandRequestProto msg) {
return HddsUtils.isReadOnly(msg) ? EventType.READ : EventType.WRITE;
}
private void audit(AuditAction action, EventType eventType,
Map<String, String> params, AuditEventStatus result, Throwable exception){
AuditMessage amsg;
switch (result) {
case SUCCESS:
if(eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logReadSuccess(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logWriteSuccess(amsg);
}
break;
case FAILURE:
if(eventType == EventType.READ &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
AUDIT.logReadFailure(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
AUDIT.logWriteFailure(amsg);
}
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Invalid audit event status - " + result);
}
}
}
//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForSuccess(AuditAction op,
Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser(null)
.atIp(null)
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS.toString())
.withException(null)
.build();
}
//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op,
Map<String, String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser(null)
.atIp(null)
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE.toString())
.withException(throwable)
.build();
}
enum EventType {
READ,
WRITE
}
}