blob: 76772586b3ec98ae10329d7fcfc170d3a748a69c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
/**
* InMemoryLevelDBAliasMapClient is the client for the InMemoryAliasMapServer.
* This is used by the Datanode and fs2img to store and retrieve FileRegions
* based on the given Block.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
implements Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryLevelDBAliasMapClient.class);
private Configuration conf;
private Collection<InMemoryAliasMapProtocol> aliasMaps;
@Override
public void close() {
if (aliasMaps != null) {
for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
RPC.stopProxy(aliasMap);
}
}
}
class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
private InMemoryAliasMapProtocol aliasMap;
LevelDbReader(InMemoryAliasMapProtocol aliasMap) {
this.aliasMap = aliasMap;
}
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
Optional<ProvidedStorageLocation> read = aliasMap.read(block);
return read.map(psl -> new FileRegion(block, psl));
}
@Override
public void close() throws IOException {
}
private class LevelDbIterator
extends BlockAliasMap<FileRegion>.ImmutableIterator {
private Iterator<FileRegion> iterator;
private Optional<Block> nextMarker;
LevelDbIterator() {
batch(Optional.empty());
}
private void batch(Optional<Block> newNextMarker) {
try {
InMemoryAliasMap.IterationResult iterationResult =
aliasMap.list(newNextMarker);
List<FileRegion> fileRegions = iterationResult.getFileRegions();
this.iterator = fileRegions.iterator();
this.nextMarker = iterationResult.getNextBlock();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return iterator.hasNext() || nextMarker.isPresent();
}
@Override
public FileRegion next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
if (nextMarker.isPresent()) {
batch(nextMarker);
return next();
} else {
throw new NoSuchElementException();
}
}
}
}
@Override
public Iterator<FileRegion> iterator() {
return new LevelDbIterator();
}
}
class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> {
private InMemoryAliasMapProtocol aliasMap;
LevelDbWriter(InMemoryAliasMapProtocol aliasMap) {
this.aliasMap = aliasMap;
}
@Override
public void store(FileRegion fileRegion) throws IOException {
aliasMap.write(fileRegion.getBlock(),
fileRegion.getProvidedStorageLocation());
}
@Override
public void remove(Block block) throws IOException {
aliasMap.remove(block);
}
@Override
public void close() throws IOException {
}
}
InMemoryLevelDBAliasMapClient() {
if (UserGroupInformation.isSecurityEnabled()) {
throw new UnsupportedOperationException("Unable to start "
+ "InMemoryLevelDBAliasMapClient as security is enabled");
}
aliasMaps = new ArrayList<>();
}
private InMemoryAliasMapProtocol getAliasMap(String blockPoolID)
throws IOException {
if (blockPoolID == null) {
throw new IOException("Block pool id required to get aliasmap reader");
}
// if a block pool id has been supplied, and doesn't match the associated
// block pool ids, return null.
for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
try {
String aliasMapBlockPoolId = aliasMap.getBlockPoolId();
if (aliasMapBlockPoolId != null &&
aliasMapBlockPoolId.equals(blockPoolID)) {
return aliasMap;
}
} catch (IOException e) {
LOG.error("Exception in retrieving block pool id {}", e);
}
}
throw new IOException(
"Unable to retrieve InMemoryAliasMap for block pool id " + blockPoolID);
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
LOG.info("Loading InMemoryAliasMapReader for block pool id {}",
blockPoolID);
return new LevelDbReader(aliasMap);
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
LOG.info("Loading InMemoryAliasMapWriter for block pool id {}",
blockPoolID);
return new LevelDbWriter(aliasMap);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
aliasMaps = InMemoryAliasMapProtocolClientSideTranslatorPB.init(conf);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void refresh() throws IOException {
}
}