blob: 7558ab85bd4be827e148f5ebc4651dc67bea809c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.NoSuchElementException;
import java.util.Collection;
import java.util.Collections;
import static org.apache.hadoop.ozone.om.exceptions.OMException.
ResultCodes.FILE_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.
OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Helper class for fetching List Status for a path.
*/
public class OzoneListStatusHelper {
/**
* Interface to get the File Status for a path.
*/
@FunctionalInterface
public interface GetFileStatusHelper {
OzoneFileStatus apply(OmKeyArgs args, String clientAddress,
boolean skipFileNotFoundError) throws IOException;
}
/**
* Interface for iteration of Heap Entries.
*/
public interface ClosableIterator extends Iterator<HeapEntry>, Closeable {
}
private static final Logger LOG =
LoggerFactory.getLogger(OzoneListStatusHelper.class);
private final OMMetadataManager metadataManager;
private final long scmBlockSize;
private final GetFileStatusHelper getStatusHelper;
OzoneListStatusHelper(OMMetadataManager metadataManager, long scmBlockSize,
GetFileStatusHelper func) {
this.metadataManager = metadataManager;
this.scmBlockSize = scmBlockSize;
this.getStatusHelper = func;
}
public Collection<OzoneFileStatus> listStatusFSO(OmKeyArgs args,
String startKey, long numEntries, String clientAddress,
boolean allowPartialPrefixes) throws IOException {
Preconditions.checkNotNull(args, "Key args can not be null");
final String volumeName = args.getVolumeName();
final String bucketName = args.getBucketName();
String keyName = args.getKeyName();
String prefixKey = keyName;
final String volumeKey = metadataManager.getVolumeKey(volumeName);
final String bucketKey = metadataManager.getBucketKey(volumeName,
bucketName);
final OmVolumeArgs volumeInfo = metadataManager.getVolumeTable()
.get(volumeKey);
final OmBucketInfo omBucketInfo = metadataManager.getBucketTable()
.get(bucketKey);
if (volumeInfo == null || omBucketInfo == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("%s does not exist.", (volumeInfo == null) ?
"Volume : " + volumeName :
"Bucket: " + volumeName + "/" + bucketName));
}
return new ArrayList<>();
}
// Determine if the prefixKey is determined from the startKey
// if the keyName is null
if (StringUtils.isNotBlank(startKey)) {
if (StringUtils.isNotBlank(keyName)) {
if (!OzoneFSUtils.isSibling(keyName, startKey) &&
!OzoneFSUtils.isImmediateChild(keyName, startKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("StartKey {} is not an immediate child or not a sibling"
+ " of keyName {}. Returns empty list", startKey, keyName);
}
return new ArrayList<>();
}
} else {
// if the prefix is blank
keyName = OzoneFSUtils.getParentDir(startKey);
prefixKey = keyName;
args = args.toBuilder()
.setKeyName(keyName)
.setSortDatanodesInPipeline(false)
.build();
}
}
OzoneFileStatus fileStatus =
getStatusHelper.apply(args, clientAddress, allowPartialPrefixes);
String dbPrefixKey;
if (fileStatus == null) {
// if the file status is null, prefix is a not a valid filesystem path
// this should only work in list keys mode.
// fetch the db key based on the prefix path.
try {
dbPrefixKey = getDbKey(keyName, args, volumeInfo, omBucketInfo);
prefixKey = OzoneFSUtils.getParentDir(keyName);
} catch (OMException ome) {
if (ome.getResult() == FILE_NOT_FOUND) {
// the parent dir cannot be found return null list
if (LOG.isDebugEnabled()) {
LOG.debug("Parent directory of keyName:{} does not exist." +
"Returns empty list", keyName);
}
return new ArrayList<>();
}
throw ome;
}
} else {
// If the keyname is a file just return one entry
if (fileStatus.isFile()) {
return Collections.singletonList(fileStatus);
}
// fetch the db key based on parent prefix id.
long id = getId(fileStatus, omBucketInfo);
final long volumeId = volumeInfo.getObjectID();
final long bucketId = omBucketInfo.getObjectID();
dbPrefixKey = metadataManager.getOzonePathKey(volumeId, bucketId,
id, "");
}
// Determine startKeyPrefix for DB iteration
String startKeyPrefix = "";
try {
if (!Strings.isNullOrEmpty(startKey)) {
startKeyPrefix = getDbKey(startKey, args, volumeInfo, omBucketInfo);
}
} catch (OMException ome) {
if (ome.getResult() != FILE_NOT_FOUND) {
throw ome;
}
}
TreeMap<String, OzoneFileStatus> map = new TreeMap<>();
BucketLayout bucketLayout = omBucketInfo.getBucketLayout();
// fetch the sorted output using a min heap iterator where
// every remove from the heap will give the smallest entry.
try (MinHeapIterator heapIterator = new MinHeapIterator(metadataManager,
dbPrefixKey, bucketLayout, startKeyPrefix, volumeName, bucketName)) {
while (map.size() < numEntries && heapIterator.hasNext()) {
HeapEntry entry = heapIterator.next();
OzoneFileStatus status = entry.getStatus(prefixKey,
scmBlockSize, volumeName, bucketName);
map.put(entry.key, status);
}
}
return map.values();
}
private String getDbKey(String key, OmKeyArgs args,
OmVolumeArgs volumeInfo,
OmBucketInfo omBucketInfo) throws IOException {
long startKeyParentId;
String parent = OzoneFSUtils.getParentDir(key);
// the keyname is not a valid filesystem path.
// determine the parent prefix by fetching the
OmKeyArgs startKeyArgs = args.toBuilder()
.setKeyName(parent)
.setSortDatanodesInPipeline(false)
.build();
OzoneFileStatus fileStatusInfo = getStatusHelper.apply(startKeyArgs,
null, false);
Preconditions.checkNotNull(fileStatusInfo);
startKeyParentId = getId(fileStatusInfo, omBucketInfo);
final long volumeId = volumeInfo.getObjectID();
final long bucketId = omBucketInfo.getObjectID();
return metadataManager.
getOzonePathKey(volumeId, bucketId, startKeyParentId,
OzoneFSUtils.getFileName(key));
}
private long getId(OzoneFileStatus fileStatus, OmBucketInfo omBucketInfo) {
if (fileStatus.getKeyInfo() != null) {
return fileStatus.getKeyInfo().getObjectID();
} else {
// list root directory.
return omBucketInfo.getObjectID();
}
}
/**
* Enum of types of entries in the heap.
*/
public enum EntryType {
DIR_CACHE,
FILE_CACHE,
RAW_DIR_DB,
RAW_FILE_DB;
public boolean isDir() {
switch (this) {
case DIR_CACHE:
case RAW_DIR_DB:
return true;
case FILE_CACHE:
case RAW_FILE_DB:
return false;
default:
throw new IllegalArgumentException();
}
}
}
/**
* Entry to be added to the heap.
*/
private static class HeapEntry implements Comparable<HeapEntry> {
private final EntryType entryType;
private final String key;
private final Object value;
HeapEntry(EntryType entryType, String key, Object value) {
Preconditions.checkArgument(
value instanceof OmDirectoryInfo ||
value instanceof OmKeyInfo);
this.entryType = entryType;
this.key = key;
this.value = value;
}
public int compareTo(HeapEntry other) {
return this.key.compareTo(other.key);
}
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (!(other instanceof HeapEntry)) {
return false;
}
HeapEntry that = (HeapEntry) other;
return this.key.equals(that.key);
}
public int hashCode() {
return key.hashCode();
}
public OzoneFileStatus getStatus(String prefixPath, long scmBlockSize,
String volumeName, String bucketName) {
OmKeyInfo keyInfo;
if (entryType.isDir()) {
Preconditions.checkArgument(value instanceof OmDirectoryInfo);
OmDirectoryInfo dirInfo = (OmDirectoryInfo) value;
String dirName = OMFileRequest.getAbsolutePath(prefixPath,
dirInfo.getName());
keyInfo = OMFileRequest.getOmKeyInfo(volumeName,
bucketName, dirInfo, dirName);
} else {
Preconditions.checkArgument(value instanceof OmKeyInfo);
keyInfo = (OmKeyInfo) value;
keyInfo.setFileName(keyInfo.getKeyName());
String fullKeyPath = OMFileRequest.getAbsolutePath(prefixPath,
keyInfo.getKeyName());
keyInfo.setKeyName(fullKeyPath);
}
return new OzoneFileStatus(keyInfo, scmBlockSize, entryType.isDir());
}
}
/**
* Iterator for DB entries in a Dir and File Table.
*/
private static class RawIter<Value> implements ClosableIterator {
private final EntryType iterType;
private final String prefixKey;
private final TableIterator<String,
? extends Table.KeyValue<String, Value>> tableIterator;
private final Table<String, Value> table;
private HeapEntry currentKey;
RawIter(EntryType iterType, Table<String, Value> table,
String prefixKey, String startKey) throws IOException {
this.iterType = iterType;
this.table = table;
this.tableIterator = table.iterator();
this.prefixKey = prefixKey;
this.currentKey = null;
if (!StringUtils.isBlank(prefixKey)) {
tableIterator.seek(prefixKey);
}
// only seek for the start key if the start key is lexicographically
// after the prefix key. For example
// Prefix key = 1024/c, Start key = 1024/a
// then do not seek for the start key
//
// on the other hand,
// Prefix key = 1024/a, Start key = 1024/c
// then seek for the start key
if (!StringUtils.isBlank(startKey) &&
startKey.compareTo(prefixKey) > 0) {
tableIterator.seek(startKey);
}
getNextKey();
}
private void getNextKey() throws IOException {
while (tableIterator.hasNext() && currentKey == null) {
Table.KeyValue<String, Value> entry = tableIterator.next();
String entryKey = entry.getKey();
if (entryKey.startsWith(prefixKey)) {
if (!KeyManagerImpl.isKeyDeleted(entryKey, table)) {
currentKey = new HeapEntry(iterType, entryKey, entry.getValue());
}
} else {
// if the prefix key does not match, then break
// as the iterator is beyond the prefix.
break;
}
}
}
public boolean hasNext() {
try {
getNextKey();
} catch (Throwable t) {
throw new NoSuchElementException();
}
return currentKey != null;
}
public HeapEntry next() {
try {
getNextKey();
} catch (Throwable t) {
throw new NoSuchElementException();
}
HeapEntry ret = currentKey;
currentKey = null;
return ret;
}
public void close() throws IOException {
tableIterator.close();
}
}
/**
* Iterator for Cache entries in a Dir and File Table.
*/
private static class CacheIter<Value extends WithParentObjectId>
implements ClosableIterator {
private final Map<String, Value> cacheKeyMap;
private final Iterator<Map.Entry<String, Value>>
cacheCreatedKeyIter;
private final Iterator<Map.Entry<CacheKey<String>, CacheValue<Value>>>
cacheIter;
private final String prefixKey;
private final String startKey;
private final EntryType entryType;
CacheIter(EntryType entryType, Iterator<Map.Entry<CacheKey<String>,
CacheValue<Value>>> cacheIter, String startKey, String prefixKey) {
this.cacheKeyMap = new TreeMap<>();
this.cacheIter = cacheIter;
this.startKey = startKey;
this.prefixKey = prefixKey;
this.entryType = entryType;
getCacheValues();
cacheCreatedKeyIter = cacheKeyMap.entrySet().iterator();
}
private void getCacheValues() {
while (cacheIter.hasNext()) {
Map.Entry<CacheKey<String>, CacheValue<Value>> entry =
cacheIter.next();
String cacheKey = entry.getKey().getCacheKey();
Value cacheOmInfo = entry.getValue().getCacheValue();
// cacheOmKeyInfo is null if an entry is deleted in cache
if (cacheOmInfo == null) {
continue;
}
if (StringUtils.isBlank(startKey)) {
// startKey is null or empty, then the seekKeyInDB="1024/"
if (cacheKey.startsWith(prefixKey)) {
cacheKeyMap.put(cacheKey, cacheOmInfo);
}
} else {
// startKey not empty, then the seekKeyInDB="1024/b" and
// seekKeyInDBWithOnlyParentID = "1024/". This is to avoid case of
// parentID with "102444" cache entries.
// Here, it has to list all the keys after "1024/b" and requires >=0
// string comparison.
if (cacheKey.startsWith(prefixKey) &&
cacheKey.compareTo(startKey) >= 0) {
cacheKeyMap.put(cacheKey, cacheOmInfo);
}
}
}
}
public boolean hasNext() {
return cacheCreatedKeyIter.hasNext();
}
public HeapEntry next() {
Map.Entry<String, Value> entry = cacheCreatedKeyIter.next();
return new HeapEntry(entryType, entry.getKey(), entry.getValue());
}
public void close() {
// Nothing to close here
}
}
/**
* Implement lexicographical sorting of the file status by sorting file status
* across multiple lists. Each of these lists are sorted internally.
*
* This class implements sorted output by implementing a min heap based
* iterator where the initial element from each of sorted list is inserted.
*
* The least entry is removed and the next entry from the same list from
* which the entry is removed is added into the list.
*
* For example
* RawDir - a1, a3, a5, a7
* RawFile - a2, a4, a6, a8
*
* Min Heap is initially composed of {(a1, RawDir), (a2, RawFile)}
* THe least element is removed i.e a1 and then next entry from RawDir
* is inserted into minheap resulting in {(a2, RawFile), (a3, RawDir)}
*
* This process is repeated till both the lists are exhausted.
*/
private static class MinHeapIterator implements ClosableIterator {
private final PriorityQueue<HeapEntry> minHeap = new PriorityQueue<>();
private final ArrayList<ClosableIterator> iterators = new ArrayList<>();
MinHeapIterator(OMMetadataManager omMetadataManager, String prefixKey,
BucketLayout bucketLayout, String startKey,
String volumeName, String bucketName) throws IOException {
omMetadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
bucketName);
// Initialize all the iterators
iterators.add(EntryType.DIR_CACHE.ordinal(),
new CacheIter<>(EntryType.DIR_CACHE,
omMetadataManager.getDirectoryTable().cacheIterator(),
startKey, prefixKey));
iterators.add(EntryType.FILE_CACHE.ordinal(),
new CacheIter<>(EntryType.FILE_CACHE,
omMetadataManager.getKeyTable(bucketLayout).cacheIterator(),
startKey, prefixKey));
iterators.add(EntryType.RAW_DIR_DB.ordinal(),
new RawIter<>(EntryType.RAW_DIR_DB,
omMetadataManager.getDirectoryTable(),
prefixKey, startKey));
iterators.add(EntryType.RAW_FILE_DB.ordinal(),
new RawIter<>(EntryType.RAW_FILE_DB,
omMetadataManager.getKeyTable(bucketLayout),
prefixKey, startKey));
omMetadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
bucketName);
// Insert the element from each of the iterator
for (Iterator<HeapEntry> iter : iterators) {
if (iter.hasNext()) {
minHeap.add(iter.next());
}
}
}
public boolean hasNext() {
return !minHeap.isEmpty();
}
public HeapEntry next() {
HeapEntry heapEntry = minHeap.remove();
// remove the least element and
// reinsert the next element from the same iterator
Iterator<HeapEntry> iter = iterators.get(heapEntry.entryType.ordinal());
if (iter.hasNext()) {
minHeap.add(iter.next());
}
return heapEntry;
}
public void close() throws IOException {
for (ClosableIterator iterator : iterators) {
iterator.close();
}
}
}
}