| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathHandle; |
| import org.apache.hadoop.fs.RawPathHandle; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.server.common.FileRegion; |
| import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; |
| import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; |
| import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; |
| import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; |
| import org.apache.hadoop.util.Timer; |
| import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| import org.codehaus.jackson.annotate.JsonProperty; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.map.ObjectReader; |
| import org.codehaus.jackson.map.ObjectWriter; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.Time; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES; |
| |
| /** |
| * This class is used to create provided volumes. |
| */ |
| @InterfaceAudience.Private |
| class ProvidedVolumeImpl extends FsVolumeImpl { |
| |
| /** |
| * Get a suffix of the full path, excluding the given prefix. |
| * |
| * @param prefix a prefix of the path. |
| * @param fullPath the full path whose suffix is needed. |
| * @return the suffix of the path, which when resolved against {@code prefix} |
| * gets back the {@code fullPath}. |
| */ |
| @VisibleForTesting |
| protected static String getSuffix(final Path prefix, final Path fullPath) { |
| String prefixStr = prefix.toString(); |
| String pathStr = fullPath.toString(); |
| if (!pathStr.startsWith(prefixStr)) { |
| LOG.debug("Path {} is not a prefix of the path {}", prefix, fullPath); |
| return pathStr; |
| } |
| String suffix = pathStr.replaceFirst("^" + prefixStr, ""); |
| if (suffix.startsWith("/")) { |
| suffix = suffix.substring(1); |
| } |
| return suffix; |
| } |
| |
| /** |
| * Class to keep track of the capacity usage statistics for provided volumes. |
| */ |
| public static class ProvidedVolumeDF { |
| |
| private AtomicLong used = new AtomicLong(); |
| |
| public long getSpaceUsed() { |
| return used.get(); |
| } |
| |
| public void decDfsUsed(long value) { |
| used.addAndGet(-value); |
| } |
| |
| public void incDfsUsed(long value) { |
| used.addAndGet(value); |
| } |
| |
| public long getCapacity() { |
| return getSpaceUsed(); |
| } |
| } |
| |
| static class ProvidedBlockPoolSlice { |
| private ProvidedVolumeImpl providedVolume; |
| |
| private BlockAliasMap<FileRegion> aliasMap; |
| private Configuration conf; |
| private String bpid; |
| private ReplicaMap bpVolumeMap; |
| private ProvidedVolumeDF df; |
| private AtomicLong numOfBlocks = new AtomicLong(); |
| private int numRetries; |
| |
| ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, |
| Configuration conf) { |
| this.providedVolume = volume; |
| bpVolumeMap = new ReplicaMap(new AutoCloseableLock()); |
| Class<? extends BlockAliasMap> fmt = |
| conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, |
| TextFileRegionAliasMap.class, BlockAliasMap.class); |
| aliasMap = ReflectionUtils.newInstance(fmt, conf); |
| this.conf = conf; |
| this.bpid = bpid; |
| this.df = new ProvidedVolumeDF(); |
| bpVolumeMap.initBlockPool(bpid); |
| this.numRetries = conf.getInt(DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 0); |
| LOG.info("Created alias map using class: " + aliasMap.getClass()); |
| } |
| |
| BlockAliasMap<FileRegion> getBlockAliasMap() { |
| return aliasMap; |
| } |
| |
| @VisibleForTesting |
| void setFileRegionProvider(BlockAliasMap<FileRegion> blockAliasMap) { |
| this.aliasMap = blockAliasMap; |
| } |
| |
| void fetchVolumeMap(ReplicaMap volumeMap, |
| RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS) |
| throws IOException { |
| BlockAliasMap.Reader<FileRegion> reader = null; |
| int tries = 1; |
| do { |
| try { |
| reader = aliasMap.getReader(null, bpid); |
| break; |
| } catch (IOException e) { |
| tries++; |
| reader = null; |
| } |
| } while (tries <= numRetries); |
| |
| if (reader == null) { |
| LOG.error("Got null reader from BlockAliasMap " + aliasMap |
| + "; no blocks will be populated"); |
| return; |
| } |
| Path blockPrefixPath = new Path(providedVolume.getBaseURI()); |
| for (FileRegion region : reader) { |
| if (containsBlock(providedVolume.baseURI, |
| region.getProvidedStorageLocation().getPath().toUri())) { |
| String blockSuffix = getSuffix(blockPrefixPath, |
| new Path(region.getProvidedStorageLocation().getPath().toUri())); |
| PathHandle pathHandle = null; |
| if (region.getProvidedStorageLocation().getNonce().length > 0) { |
| pathHandle = new RawPathHandle(ByteBuffer |
| .wrap(region.getProvidedStorageLocation().getNonce())); |
| } |
| ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) |
| .setBlockId(region.getBlock().getBlockId()) |
| .setPathPrefix(blockPrefixPath) |
| .setPathSuffix(blockSuffix) |
| .setOffset(region.getProvidedStorageLocation().getOffset()) |
| .setLength(region.getBlock().getNumBytes()) |
| .setGenerationStamp(region.getBlock().getGenerationStamp()) |
| .setPathHandle(pathHandle) |
| .setFsVolume(providedVolume) |
| .setConf(conf) |
| .setRemoteFS(remoteFS) |
| .build(); |
| ReplicaInfo oldReplica = |
| volumeMap.get(bpid, newReplica.getBlockId()); |
| if (oldReplica == null) { |
| volumeMap.add(bpid, newReplica); |
| bpVolumeMap.add(bpid, newReplica); |
| incrNumBlocks(); |
| incDfsUsed(region.getBlock().getNumBytes()); |
| } else { |
| LOG.warn("A block with id " + newReplica.getBlockId() |
| + " exists locally. Skipping PROVIDED replica"); |
| } |
| } |
| } |
| } |
| |
| private void incrNumBlocks() { |
| numOfBlocks.incrementAndGet(); |
| } |
| |
| public boolean isEmpty() { |
| return bpVolumeMap.replicas(bpid).size() == 0; |
| } |
| |
| public void shutdown(BlockListAsLongs blocksListsAsLongs) { |
| // nothing to do! |
| } |
| |
| public void compileReport(LinkedList<ScanInfo> report, |
| ReportCompiler reportCompiler) |
| throws IOException, InterruptedException { |
| /* refresh the aliasMap and return the list of blocks found. |
| * the assumption here is that the block ids in the external |
| * block map, after the refresh, are consistent with those |
| * from before the refresh, i.e., for blocks which did not change, |
| * the ids remain the same. |
| */ |
| aliasMap.refresh(); |
| BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null, bpid); |
| for (FileRegion region : reader) { |
| reportCompiler.throttle(); |
| report.add(new ScanInfo(region.getBlock().getBlockId(), |
| providedVolume, region, |
| region.getProvidedStorageLocation().getLength())); |
| } |
| } |
| |
| public long getNumOfBlocks() { |
| return numOfBlocks.get(); |
| } |
| |
| long getDfsUsed() throws IOException { |
| return df.getSpaceUsed(); |
| } |
| |
| void incDfsUsed(long value) { |
| df.incDfsUsed(value); |
| } |
| } |
| |
| private URI baseURI; |
| private final Map<String, ProvidedBlockPoolSlice> bpSlices = |
| new ConcurrentHashMap<String, ProvidedBlockPoolSlice>(); |
| |
| private ProvidedVolumeDF df; |
| // the remote FileSystem to which this ProvidedVolume points to. |
| private FileSystem remoteFS; |
| |
| ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID, |
| StorageDirectory sd, FileIoProvider fileIoProvider, |
| Configuration conf) throws IOException { |
| super(dataset, storageID, sd, fileIoProvider, conf, null); |
| assert getStorageLocation().getStorageType() == StorageType.PROVIDED: |
| "Only provided storages must use ProvidedVolume"; |
| |
| baseURI = getStorageLocation().getUri(); |
| df = new ProvidedVolumeDF(); |
| remoteFS = FileSystem.get(baseURI, conf); |
| } |
| |
| @Override |
| public String[] getBlockPoolList() { |
| return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); |
| } |
| |
| @Override |
| public long getCapacity() { |
| try { |
| // default to whatever is the space used! |
| return getDfsUsed(); |
| } catch (IOException e) { |
| LOG.warn("Exception when trying to get capacity of ProvidedVolume: {}", |
| e); |
| } |
| return 0L; |
| } |
| |
| @Override |
| public long getDfsUsed() throws IOException { |
| long dfsUsed = 0; |
| synchronized(getDataset()) { |
| for(ProvidedBlockPoolSlice s : bpSlices.values()) { |
| dfsUsed += s.getDfsUsed(); |
| } |
| } |
| return dfsUsed; |
| } |
| |
| @Override |
| long getBlockPoolUsed(String bpid) throws IOException { |
| return getProvidedBlockPoolSlice(bpid).getDfsUsed(); |
| } |
| |
| @Override |
| public long getAvailable() throws IOException { |
| long remaining = getCapacity() - getDfsUsed(); |
| // do not report less than 0 remaining space for PROVIDED storage |
| // to prevent marking it as over capacity on NN |
| if (remaining < 0L) { |
| LOG.warn("Volume {} has less than 0 available space", this); |
| return 0L; |
| } |
| return remaining; |
| } |
| |
| @Override |
| long getActualNonDfsUsed() throws IOException { |
| return 0L; |
| } |
| |
| @Override |
| public long getNonDfsUsed() throws IOException { |
| return 0L; |
| } |
| |
| @Override |
| long getNumBlocks() { |
| long numBlocks = 0; |
| for (ProvidedBlockPoolSlice s : bpSlices.values()) { |
| numBlocks += s.getNumOfBlocks(); |
| } |
| return numBlocks; |
| } |
| |
| @Override |
| void incDfsUsedAndNumBlocks(String bpid, long value) { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public URI getBaseURI() { |
| return baseURI; |
| } |
| |
| @Override |
| public File getFinalizedDir(String bpid) throws IOException { |
| return null; |
| } |
| |
| @Override |
| public void reserveSpaceForReplica(long bytesToReserve) { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public void releaseReservedSpace(long bytesToRelease) { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| private static final ObjectWriter WRITER = |
| new ObjectMapper().writerWithDefaultPrettyPrinter(); |
| private static final ObjectReader READER = |
| new ObjectMapper().reader(ProvidedBlockIteratorState.class); |
| |
| private static class ProvidedBlockIteratorState { |
| ProvidedBlockIteratorState() { |
| iterStartMs = Time.now(); |
| lastSavedMs = iterStartMs; |
| atEnd = false; |
| lastBlockId = -1; |
| } |
| |
| // The wall-clock ms since the epoch at which this iterator was last saved. |
| @JsonProperty |
| private long lastSavedMs; |
| |
| // The wall-clock ms since the epoch at which this iterator was created. |
| @JsonProperty |
| private long iterStartMs; |
| |
| @JsonProperty |
| private boolean atEnd; |
| |
| // The id of the last block read when the state of the iterator is saved. |
| // This implementation assumes that provided blocks are returned |
| // in sorted order of the block ids. |
| @JsonProperty |
| private long lastBlockId; |
| } |
| |
| private class ProviderBlockIteratorImpl |
| implements FsVolumeSpi.BlockIterator { |
| |
| private String bpid; |
| private String name; |
| private BlockAliasMap<FileRegion> blockAliasMap; |
| private Iterator<FileRegion> blockIterator; |
| private ProvidedBlockIteratorState state; |
| |
| ProviderBlockIteratorImpl(String bpid, String name, |
| BlockAliasMap<FileRegion> blockAliasMap) { |
| this.bpid = bpid; |
| this.name = name; |
| this.blockAliasMap = blockAliasMap; |
| rewind(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| blockAliasMap.close(); |
| } |
| |
| @Override |
| public ExtendedBlock nextBlock() throws IOException { |
| if (null == blockIterator || !blockIterator.hasNext()) { |
| return null; |
| } |
| FileRegion nextRegion = null; |
| while (null == nextRegion && blockIterator.hasNext()) { |
| FileRegion temp = blockIterator.next(); |
| if (temp.getBlock().getBlockId() < state.lastBlockId) { |
| continue; |
| } |
| nextRegion = temp; |
| } |
| if (null == nextRegion) { |
| return null; |
| } |
| state.lastBlockId = nextRegion.getBlock().getBlockId(); |
| return new ExtendedBlock(bpid, nextRegion.getBlock()); |
| } |
| |
| @Override |
| public boolean atEnd() { |
| return blockIterator != null ? !blockIterator.hasNext(): true; |
| } |
| |
| @Override |
| public void rewind() { |
| BlockAliasMap.Reader<FileRegion> reader = null; |
| try { |
| reader = blockAliasMap.getReader(null, bpid); |
| } catch (IOException e) { |
| LOG.warn("Exception in getting reader from provided alias map"); |
| } |
| if (reader != null) { |
| blockIterator = reader.iterator(); |
| } else { |
| blockIterator = null; |
| } |
| state = new ProvidedBlockIteratorState(); |
| } |
| |
| @Override |
| public void save() throws IOException { |
| // We do not persist the state of this iterator locally. |
| // We just re-scan provided volumes as necessary. |
| state.lastSavedMs = Time.now(); |
| } |
| |
| @Override |
| public void setMaxStalenessMs(long maxStalenessMs) { |
| // do not use max staleness |
| } |
| |
| @Override |
| public long getIterStartMs() { |
| return state.iterStartMs; |
| } |
| |
| @Override |
| public long getLastSavedMs() { |
| return state.lastSavedMs; |
| } |
| |
| @Override |
| public String getBlockPoolId() { |
| return bpid; |
| } |
| |
| public void load() throws IOException { |
| // on load, we just rewind the iterator for provided volumes. |
| rewind(); |
| LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(), |
| bpid, name, WRITER.writeValueAsString(state)); |
| } |
| } |
| |
| @Override |
| public BlockIterator newBlockIterator(String bpid, String name) { |
| return new ProviderBlockIteratorImpl(bpid, name, |
| bpSlices.get(bpid).getBlockAliasMap()); |
| } |
| |
| @Override |
| public BlockIterator loadBlockIterator(String bpid, String name) |
| throws IOException { |
| ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name, |
| bpSlices.get(bpid).getBlockAliasMap()); |
| iter.load(); |
| return iter; |
| } |
| |
| @Override |
| ReplicaInfo addFinalizedBlock(String bpid, Block b, |
| ReplicaInfo replicaInfo, long bytesReserved) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public VolumeCheckResult check(VolumeCheckContext ignored) |
| throws DiskErrorException { |
| return VolumeCheckResult.HEALTHY; |
| } |
| |
| @Override |
| void getVolumeMap(ReplicaMap volumeMap, |
| final RamDiskReplicaTracker ramDiskReplicaMap) |
| throws IOException { |
| LOG.info("Creating volumemap for provided volume " + this); |
| for(ProvidedBlockPoolSlice s : bpSlices.values()) { |
| s.fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); |
| } |
| } |
| |
| private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid) |
| throws IOException { |
| ProvidedBlockPoolSlice bp = bpSlices.get(bpid); |
| if (bp == null) { |
| throw new IOException("block pool " + bpid + " is not found"); |
| } |
| return bp; |
| } |
| |
| @Override |
| void getVolumeMap(String bpid, ReplicaMap volumeMap, |
| final RamDiskReplicaTracker ramDiskReplicaMap) |
| throws IOException { |
| getProvidedBlockPoolSlice(bpid).fetchVolumeMap(volumeMap, ramDiskReplicaMap, |
| remoteFS); |
| } |
| |
| @VisibleForTesting |
| BlockAliasMap<FileRegion> getBlockFormat(String bpid) throws IOException { |
| return getProvidedBlockPoolSlice(bpid).getBlockAliasMap(); |
| } |
| |
| @Override |
| public String toString() { |
| return this.baseURI.toString(); |
| } |
| |
| @Override |
| void addBlockPool(String bpid, Configuration conf) throws IOException { |
| addBlockPool(bpid, conf, null); |
| } |
| |
| @Override |
| void addBlockPool(String bpid, Configuration conf, Timer timer) |
| throws IOException { |
| LOG.info("Adding block pool " + bpid + |
| " to volume with id " + getStorageID()); |
| ProvidedBlockPoolSlice bp; |
| bp = new ProvidedBlockPoolSlice(bpid, this, conf); |
| bpSlices.put(bpid, bp); |
| } |
| |
| void shutdown() { |
| if (cacheExecutor != null) { |
| cacheExecutor.shutdown(); |
| } |
| Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet(); |
| for (Entry<String, ProvidedBlockPoolSlice> entry : set) { |
| entry.getValue().shutdown(null); |
| } |
| } |
| |
| @Override |
| void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { |
| ProvidedBlockPoolSlice bp = bpSlices.get(bpid); |
| if (bp != null) { |
| bp.shutdown(blocksListsAsLongs); |
| } |
| bpSlices.remove(bpid); |
| } |
| |
| @Override |
| boolean isBPDirEmpty(String bpid) throws IOException { |
| return getProvidedBlockPoolSlice(bpid).isEmpty(); |
| } |
| |
| @Override |
| void deleteBPDirectories(String bpid, boolean force) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public LinkedList<ScanInfo> compileReport(String bpid, |
| LinkedList<ScanInfo> report, ReportCompiler reportCompiler) |
| throws InterruptedException, IOException { |
| LOG.info("Compiling report for volume: " + this + " bpid " + bpid); |
| if(bpSlices.containsKey(bpid)) { |
| bpSlices.get(bpid).compileReport(report, reportCompiler); |
| } |
| return report; |
| } |
| |
| @Override |
| public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, |
| long newGS, long estimateBlockLen) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b, |
| ReplicaInfo temp) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public ReplicaInPipeline createTemporary(ExtendedBlock b) |
| throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur, |
| String bpid, long newBlockId, long recoveryId, long newlength) |
| throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block, |
| ReplicaInfo replicaInfo, int smallBufferSize, |
| Configuration conf) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| @Override |
| public File[] copyBlockToLazyPersistLocation(String bpId, long blockId, |
| long genStamp, ReplicaInfo replicaInfo, int smallBufferSize, |
| Configuration conf) throws IOException { |
| throw new UnsupportedOperationException( |
| "ProvidedVolume does not yet support writes"); |
| } |
| |
| private static URI getAbsoluteURI(URI uri) { |
| if (!uri.isAbsolute()) { |
| // URI is not absolute implies it is for a local file |
| // normalize the URI |
| return StorageLocation.normalizeFileURI(uri); |
| } else { |
| return uri; |
| } |
| } |
| /** |
| * @param volumeURI URI of the volume |
| * @param blockURI URI of the block |
| * @return true if the {@code blockURI} can belong to the volume or both URIs |
| * are null. |
| */ |
| @VisibleForTesting |
| public static boolean containsBlock(URI volumeURI, URI blockURI) { |
| if (volumeURI == null && blockURI == null){ |
| return true; |
| } |
| if (volumeURI == null || blockURI == null) { |
| return false; |
| } |
| volumeURI = getAbsoluteURI(volumeURI); |
| blockURI = getAbsoluteURI(blockURI); |
| return !volumeURI.relativize(blockURI).equals(blockURI); |
| } |
| |
| @VisibleForTesting |
| BlockAliasMap<FileRegion> getFileRegionProvider(String bpid) throws |
| IOException { |
| return getProvidedBlockPoolSlice(bpid).getBlockAliasMap(); |
| } |
| |
| @VisibleForTesting |
| void setFileRegionProvider(String bpid, |
| BlockAliasMap<FileRegion> blockAliasMap) throws IOException { |
| ProvidedBlockPoolSlice bp = bpSlices.get(bpid); |
| if (bp == null) { |
| throw new IOException("block pool " + bpid + " is not found"); |
| } |
| bp.setFileRegionProvider(blockAliasMap); |
| } |
| } |