| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license |
| * agreements. See the NOTICE file distributed with this work for additional |
| * information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache |
| * License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the |
| * License. You may obtain a |
| * copy of the License at |
| * |
| * <p>http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * <p>Unless required by applicable law or agreed to in writing, software |
| * distributed under the |
| * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
| * CONDITIONS OF ANY KIND, either |
| * express or implied. See the License for the specific language governing |
| * permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.server; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; |
| import org.apache.hadoop.hdds.utils.HddsServerUtil; |
| import org.apache.hadoop.hdds.scm.ScmInfo; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.net.Node; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; |
| import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.ozone.audit.AuditAction; |
| import org.apache.hadoop.ozone.audit.AuditEventStatus; |
| import org.apache.hadoop.ozone.audit.AuditLogger; |
| import org.apache.hadoop.ozone.audit.AuditLoggerType; |
| import org.apache.hadoop.ozone.audit.AuditMessage; |
| import org.apache.hadoop.ozone.audit.Auditor; |
| import org.apache.hadoop.ozone.audit.SCMAction; |
| import org.apache.hadoop.ozone.common.BlockGroup; |
| import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; |
| import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; |
| import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; |
| |
| import com.google.common.collect.Maps; |
| import com.google.protobuf.BlockingService; |
| import com.google.protobuf.ProtocolMessageEnum; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; |
| import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; |
| import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName; |
| import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * SCM block protocol is the protocol used by Namenode and OzoneManager to get |
| * blocks from the SCM. |
| */ |
| public class SCMBlockProtocolServer implements |
| ScmBlockLocationProtocol, Auditor { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(SCMBlockProtocolServer.class); |
| |
| private static final AuditLogger AUDIT = |
| new AuditLogger(AuditLoggerType.SCMLOGGER); |
| |
| private final StorageContainerManager scm; |
| private final OzoneConfiguration conf; |
| private final RPC.Server blockRpcServer; |
| private final InetSocketAddress blockRpcAddress; |
| private final ProtocolMessageMetrics<ProtocolMessageEnum> |
| protocolMessageMetrics; |
| |
| /** |
| * The RPC server that listens to requests from block service clients. |
| */ |
| public SCMBlockProtocolServer(OzoneConfiguration conf, |
| StorageContainerManager scm) throws IOException { |
| this.scm = scm; |
| this.conf = conf; |
| final int handlerCount = |
| conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, |
| OZONE_SCM_HANDLER_COUNT_DEFAULT); |
| |
| RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, |
| ProtobufRpcEngine.class); |
| |
| protocolMessageMetrics = |
| ProtocolMessageMetrics.create( |
| "ScmBlockLocationProtocol", |
| "SCM Block location protocol counters", |
| ScmBlockLocationProtocolProtos.Type.values()); |
| |
| // SCM Block Service RPC. |
| BlockingService blockProtoPbService = |
| ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService |
| .newReflectiveBlockingService( |
| new ScmBlockLocationProtocolServerSideTranslatorPB(this, |
| protocolMessageMetrics)); |
| |
| final InetSocketAddress scmBlockAddress = HddsServerUtil |
| .getScmBlockClientBindAddress(conf); |
| blockRpcServer = |
| startRpcServer( |
| conf, |
| scmBlockAddress, |
| ScmBlockLocationProtocolPB.class, |
| blockProtoPbService, |
| handlerCount); |
| blockRpcAddress = |
| updateRPCListenAddress( |
| conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, |
| blockRpcServer); |
| if (conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, |
| false)) { |
| blockRpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance()); |
| } |
| } |
| |
| public RPC.Server getBlockRpcServer() { |
| return blockRpcServer; |
| } |
| |
| public InetSocketAddress getBlockRpcAddress() { |
| return blockRpcAddress; |
| } |
| |
| public void start() { |
| protocolMessageMetrics.register(); |
| LOG.info( |
| StorageContainerManager.buildRpcServerStartMessage( |
| "RPC server for Block Protocol", getBlockRpcAddress())); |
| getBlockRpcServer().start(); |
| } |
| |
| public void stop() { |
| try { |
| protocolMessageMetrics.unregister(); |
| LOG.info("Stopping the RPC server for Block Protocol"); |
| getBlockRpcServer().stop(); |
| } catch (Exception ex) { |
| LOG.error("Block Protocol RPC stop failed.", ex); |
| } |
| IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); |
| } |
| |
| public void join() throws InterruptedException { |
| LOG.trace("Join RPC server for Block Protocol"); |
| getBlockRpcServer().join(); |
| } |
| |
| @Override |
| public List<AllocatedBlock> allocateBlock(long size, int num, |
| HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, |
| String owner, ExcludeList excludeList) throws IOException { |
| Map<String, String> auditMap = Maps.newHashMap(); |
| auditMap.put("size", String.valueOf(size)); |
| auditMap.put("type", type.name()); |
| auditMap.put("factor", factor.name()); |
| auditMap.put("owner", owner); |
| List<AllocatedBlock> blocks = new ArrayList<>(num); |
| boolean auditSuccess = true; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Allocating {} blocks of size {}, with {}", |
| num, size, excludeList); |
| } |
| try { |
| for (int i = 0; i < num; i++) { |
| AllocatedBlock block = scm.getScmBlockManager() |
| .allocateBlock(size, type, factor, owner, excludeList); |
| if (block != null) { |
| blocks.add(block); |
| } |
| } |
| return blocks; |
| } catch (Exception ex) { |
| auditSuccess = false; |
| AUDIT.logWriteFailure( |
| buildAuditMessageForFailure(SCMAction.ALLOCATE_BLOCK, auditMap, ex) |
| ); |
| throw ex; |
| } finally { |
| if(auditSuccess) { |
| AUDIT.logWriteSuccess( |
| buildAuditMessageForSuccess(SCMAction.ALLOCATE_BLOCK, auditMap) |
| ); |
| } |
| } |
| } |
| |
| /** |
| * Delete blocks for a set of object keys. |
| * |
| * @param keyBlocksInfoList list of block keys with object keys to delete. |
| * @return deletion results. |
| */ |
| @Override |
| public List<DeleteBlockGroupResult> deleteKeyBlocks( |
| List<BlockGroup> keyBlocksInfoList) throws IOException { |
| LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList |
| .size()); |
| List<DeleteBlockGroupResult> results = new ArrayList<>(); |
| Map<String, String> auditMap = Maps.newHashMap(); |
| for (BlockGroup keyBlocks : keyBlocksInfoList) { |
| ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode; |
| try { |
| // We delete blocks in an atomic operation to prevent getting |
| // into state like only a partial of blocks are deleted, |
| // which will leave key in an inconsistent state. |
| auditMap.put("keyBlockToDelete", keyBlocks.toString()); |
| scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList()); |
| resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult |
| .Result.success; |
| AUDIT.logWriteSuccess( |
| buildAuditMessageForSuccess(SCMAction.DELETE_KEY_BLOCK, auditMap) |
| ); |
| } catch (SCMException scmEx) { |
| LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); |
| AUDIT.logWriteFailure( |
| buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap, |
| scmEx) |
| ); |
| switch (scmEx.getResult()) { |
| case SAFE_MODE_EXCEPTION: |
| resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult |
| .Result.safeMode; |
| break; |
| case FAILED_TO_FIND_BLOCK: |
| resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult |
| .Result.errorNotFound; |
| break; |
| default: |
| resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult |
| .Result.unknownFailure; |
| } |
| } catch (IOException ex) { |
| LOG.warn("Fail to delete blocks for object key: {}", keyBlocks |
| .getGroupID(), ex); |
| AUDIT.logWriteFailure( |
| buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap, |
| ex) |
| ); |
| resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult |
| .Result.unknownFailure; |
| } |
| List<DeleteBlockResult> blockResultList = new ArrayList<>(); |
| for (BlockID blockKey : keyBlocks.getBlockIDList()) { |
| blockResultList.add(new DeleteBlockResult(blockKey, resultCode)); |
| } |
| results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), |
| blockResultList)); |
| } |
| return results; |
| } |
| |
| @Override |
| public ScmInfo getScmInfo() throws IOException { |
| boolean auditSuccess = true; |
| try{ |
| ScmInfo.Builder builder = |
| new ScmInfo.Builder() |
| .setClusterId(scm.getScmStorageConfig().getClusterID()) |
| .setScmId(scm.getScmStorageConfig().getScmId()); |
| return builder.build(); |
| } catch (Exception ex) { |
| auditSuccess = false; |
| AUDIT.logReadFailure( |
| buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex) |
| ); |
| throw ex; |
| } finally { |
| if(auditSuccess) { |
| AUDIT.logReadSuccess( |
| buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null) |
| ); |
| } |
| } |
| } |
| |
| @Override |
| public List<DatanodeDetails> sortDatanodes(List<String> nodes, |
| String clientMachine) throws IOException { |
| boolean auditSuccess = true; |
| try{ |
| NodeManager nodeManager = scm.getScmNodeManager(); |
| Node client = null; |
| List<DatanodeDetails> possibleClients = |
| nodeManager.getNodesByAddress(clientMachine); |
| if (possibleClients.size()>0){ |
| client = possibleClients.get(0); |
| } |
| List<Node> nodeList = new ArrayList(); |
| nodes.stream().forEach(uuid -> { |
| DatanodeDetails node = nodeManager.getNodeByUuid(uuid); |
| if (node != null) { |
| nodeList.add(node); |
| } |
| }); |
| List<? extends Node> sortedNodeList = scm.getClusterMap() |
| .sortByDistanceCost(client, nodeList, nodes.size()); |
| List<DatanodeDetails> ret = new ArrayList<>(); |
| sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node)); |
| return ret; |
| } catch (Exception ex) { |
| auditSuccess = false; |
| AUDIT.logReadFailure( |
| buildAuditMessageForFailure(SCMAction.SORT_DATANODE, null, ex) |
| ); |
| throw ex; |
| } finally { |
| if(auditSuccess) { |
| AUDIT.logReadSuccess( |
| buildAuditMessageForSuccess(SCMAction.SORT_DATANODE, null) |
| ); |
| } |
| } |
| } |
| |
| @Override |
| public AuditMessage buildAuditMessageForSuccess( |
| AuditAction op, Map<String, String> auditMap) { |
| |
| return new AuditMessage.Builder() |
| .setUser(getRemoteUserName()) |
| .atIp(Server.getRemoteAddress()) |
| .forOperation(op) |
| .withParams(auditMap) |
| .withResult(AuditEventStatus.SUCCESS) |
| .build(); |
| } |
| |
| @Override |
| public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String, |
| String> auditMap, Throwable throwable) { |
| |
| return new AuditMessage.Builder() |
| .setUser(getRemoteUserName()) |
| .atIp(Server.getRemoteAddress()) |
| .forOperation(op) |
| .withParams(auditMap) |
| .withResult(AuditEventStatus.FAILURE) |
| .withException(throwable) |
| .build(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| stop(); |
| } |
| } |