blob: 95f21ae0ca332543a89ebcd215bc23294a700926 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.hdds.utils.db.cache.TableCacheImpl;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmMultipartKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmPrefixInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
import org.apache.hadoop.ozone.om.codec.RepeatedOmKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.S3SecretValueCodec;
import org.apache.hadoop.ozone.om.codec.TokenIdentifierCodec;
import org.apache.hadoop.ozone.om.codec.UserVolumeInfoCodec;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.UserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ozone metadata manager interface.
*/
public class OmMetadataManagerImpl implements OMMetadataManager {
private static final Logger LOG =
LoggerFactory.getLogger(OmMetadataManagerImpl.class);
/**
* OM RocksDB Structure .
* <p>
* OM DB stores metadata as KV pairs in different column families.
* <p>
* OM DB Schema:
* |----------------------------------------------------------------------|
* | Column Family | VALUE |
* |----------------------------------------------------------------------|
* | userTable | /user->UserVolumeInfo |
* |----------------------------------------------------------------------|
* | volumeTable | /volume->VolumeInfo |
* |----------------------------------------------------------------------|
* | bucketTable | /volume/bucket-> BucketInfo |
* |----------------------------------------------------------------------|
* | keyTable | /volumeName/bucketName/keyName->KeyInfo |
* |----------------------------------------------------------------------|
* | deletedTable | /volumeName/bucketName/keyName->RepeatedKeyInfo |
* |----------------------------------------------------------------------|
* | openKey | /volumeName/bucketName/keyName/id->KeyInfo |
* |----------------------------------------------------------------------|
* | s3Table | s3BucketName -> /volumeName/bucketName |
* |----------------------------------------------------------------------|
* | s3SecretTable | s3g_access_key_id -> s3Secret |
* |----------------------------------------------------------------------|
* | dTokenTable | s3g_access_key_id -> s3Secret |
* |----------------------------------------------------------------------|
* | prefixInfoTable | prefix -> PrefixInfo |
* |----------------------------------------------------------------------|
* | multipartInfoTable| /volumeName/bucketName/keyName/uploadId ->... |
* |----------------------------------------------------------------------|
*/
public static final String USER_TABLE = "userTable";
public static final String VOLUME_TABLE = "volumeTable";
public static final String BUCKET_TABLE = "bucketTable";
public static final String KEY_TABLE = "keyTable";
public static final String DELETED_TABLE = "deletedTable";
public static final String OPEN_KEY_TABLE = "openKeyTable";
public static final String S3_TABLE = "s3Table";
public static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
public static final String S3_SECRET_TABLE = "s3SecretTable";
public static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
public static final String PREFIX_TABLE = "prefixTable";
private DBStore store;
private final OzoneManagerLock lock;
private final long openKeyExpireThresholdMS;
private Table userTable;
private Table volumeTable;
private Table bucketTable;
private Table keyTable;
private Table deletedTable;
private Table openKeyTable;
private Table s3Table;
private Table<String, OmMultipartKeyInfo> multipartInfoTable;
private Table s3SecretTable;
private Table dTokenTable;
private Table prefixTable;
private boolean isRatisEnabled;
public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
this.lock = new OzoneManagerLock(conf);
this.openKeyExpireThresholdMS = 1000L * conf.getInt(
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
// TODO: This is a temporary check. Once fully implemented, all OM state
// change should go through Ratis - be it standalone (for non-HA) or
// replicated (for HA).
isRatisEnabled = conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
start(conf);
}
/**
* For subclass overriding.
*/
protected OmMetadataManagerImpl() {
this.lock = new OzoneManagerLock(new OzoneConfiguration());
this.openKeyExpireThresholdMS =
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
}
@Override
public Table<String, UserVolumeInfo> getUserTable() {
return userTable;
}
public Table<OzoneTokenIdentifier, Long> getDelegationTokenTable() {
return dTokenTable;
}
@Override
public Table<String, OmVolumeArgs> getVolumeTable() {
return volumeTable;
}
@Override
public Table<String, OmBucketInfo> getBucketTable() {
return bucketTable;
}
@Override
public Table<String, OmKeyInfo> getKeyTable() {
return keyTable;
}
@Override
public Table<String, RepeatedOmKeyInfo> getDeletedTable() {
return deletedTable;
}
@Override
public Table<String, OmKeyInfo> getOpenKeyTable() {
return openKeyTable;
}
@Override
public Table<String, String> getS3Table() {
return s3Table;
}
@Override
public Table<String, OmPrefixInfo> getPrefixTable() {
return prefixTable;
}
@Override
public Table<String, OmMultipartKeyInfo> getMultipartInfoTable() {
return multipartInfoTable;
}
private void checkTableStatus(Table table, String name) throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
"continue.";
String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" +
"for more info.";
if (table == null) {
LOG.error(String.format(logMessage, name));
throw new IOException(String.format(errMsg, name));
}
}
/**
* Start metadata manager.
*/
@Override
public void start(OzoneConfiguration configuration) throws IOException {
// We need to create the DB here, as when during restart, stop closes the
// db, so we need to create the store object and initialize DB.
if (store == null) {
File metaDir = OmUtils.getOmDbDir(configuration);
DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration)
.setName(OM_DB_NAME)
.setPath(Paths.get(metaDir.getPath()));
this.store = addOMTablesAndCodecs(dbStoreBuilder).build();
initializeOmTables();
}
}
protected DBStoreBuilder addOMTablesAndCodecs(DBStoreBuilder builder) {
return builder.addTable(USER_TABLE)
.addTable(VOLUME_TABLE)
.addTable(BUCKET_TABLE)
.addTable(KEY_TABLE)
.addTable(DELETED_TABLE)
.addTable(OPEN_KEY_TABLE)
.addTable(S3_TABLE)
.addTable(MULTIPARTINFO_TABLE)
.addTable(DELEGATION_TOKEN_TABLE)
.addTable(S3_SECRET_TABLE)
.addTable(PREFIX_TABLE)
.addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
.addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
.addCodec(RepeatedOmKeyInfo.class, new RepeatedOmKeyInfoCodec())
.addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
.addCodec(UserVolumeInfo.class, new UserVolumeInfoCodec())
.addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec())
.addCodec(S3SecretValue.class, new S3SecretValueCodec())
.addCodec(OmPrefixInfo.class, new OmPrefixInfoCodec());
}
/**
* Initialize OM Tables.
*
* @throws IOException
*/
protected void initializeOmTables() throws IOException {
userTable =
this.store.getTable(USER_TABLE, String.class, UserVolumeInfo.class);
checkTableStatus(userTable, USER_TABLE);
TableCacheImpl.CacheCleanupPolicy cleanupPolicy =
TableCacheImpl.CacheCleanupPolicy.NEVER;
volumeTable =
this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class,
cleanupPolicy);
checkTableStatus(volumeTable, VOLUME_TABLE);
bucketTable =
this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class,
cleanupPolicy);
checkTableStatus(bucketTable, BUCKET_TABLE);
keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
checkTableStatus(keyTable, KEY_TABLE);
deletedTable = this.store.getTable(DELETED_TABLE, String.class,
RepeatedOmKeyInfo.class);
checkTableStatus(deletedTable, DELETED_TABLE);
openKeyTable =
this.store.getTable(OPEN_KEY_TABLE, String.class, OmKeyInfo.class);
checkTableStatus(openKeyTable, OPEN_KEY_TABLE);
s3Table = this.store.getTable(S3_TABLE, String.class, String.class);
checkTableStatus(s3Table, S3_TABLE);
multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
String.class, OmMultipartKeyInfo.class);
checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
OzoneTokenIdentifier.class, Long.class);
checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
s3SecretTable = this.store.getTable(S3_SECRET_TABLE, String.class,
S3SecretValue.class);
checkTableStatus(s3SecretTable, S3_SECRET_TABLE);
prefixTable = this.store.getTable(PREFIX_TABLE, String.class,
OmPrefixInfo.class);
checkTableStatus(prefixTable, PREFIX_TABLE);
}
/**
* Stop metadata manager.
*/
@Override
public void stop() throws Exception {
if (store != null) {
store.close();
store = null;
}
}
/**
* Get metadata store.
*
* @return store - metadata store.
*/
@VisibleForTesting
@Override
public DBStore getStore() {
return store;
}
/**
* Given a volume return the corresponding DB key.
*
* @param volume - Volume name
*/
@Override
public String getVolumeKey(String volume) {
return OzoneConsts.OM_KEY_PREFIX + volume;
}
/**
* Given a user return the corresponding DB key.
*
* @param user - User name
*/
@Override
public String getUserKey(String user) {
return user;
}
/**
* Given a volume and bucket, return the corresponding DB key.
*
* @param volume - User name
* @param bucket - Bucket name
*/
@Override
public String getBucketKey(String volume, String bucket) {
StringBuilder builder =
new StringBuilder().append(OM_KEY_PREFIX).append(volume);
if (StringUtils.isNotBlank(bucket)) {
builder.append(OM_KEY_PREFIX).append(bucket);
}
return builder.toString();
}
@Override
public String getOzoneKey(String volume, String bucket, String key) {
StringBuilder builder = new StringBuilder()
.append(OM_KEY_PREFIX).append(volume);
// TODO : Throw if the Bucket is null?
builder.append(OM_KEY_PREFIX).append(bucket);
if (StringUtil.isNotBlank(key)) {
builder.append(OM_KEY_PREFIX);
if (!key.equals(OM_KEY_PREFIX)) {
builder.append(key);
}
}
return builder.toString();
}
@Override
public String getOzoneDirKey(String volume, String bucket, String key) {
key = OzoneFSUtils.addTrailingSlashIfNeeded(key);
return getOzoneKey(volume, bucket, key);
}
@Override
public String getOpenKey(String volume, String bucket,
String key, long id) {
String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
OM_KEY_PREFIX + key + OM_KEY_PREFIX + id;
return openKey;
}
@Override
public String getMultipartKey(String volume, String bucket, String key,
String
uploadId) {
return OmMultipartUpload.getDbKey(volume, bucket, key, uploadId);
}
/**
* Returns the OzoneManagerLock used on Metadata DB.
*
* @return OzoneManagerLock
*/
@Override
public org.apache.hadoop.ozone.om.lock.OzoneManagerLock getLock() {
return lock;
}
/**
* Returns true if the firstArray startsWith the bytes of secondArray.
*
* @param firstArray - Byte array
* @param secondArray - Byte array
* @return true if the first array bytes match the bytes in the second array.
*/
private boolean startsWith(byte[] firstArray, byte[] secondArray) {
if (firstArray == null) {
// if both are null, then the arrays match, else if first is null and
// second is not, then this function returns false.
return secondArray == null;
}
if (secondArray != null) {
// If the second array is longer then first array cannot be starting with
// the bytes of second array.
if (secondArray.length > firstArray.length) {
return false;
}
for (int ndx = 0; ndx < secondArray.length; ndx++) {
if (firstArray[ndx] != secondArray[ndx]) {
return false;
}
}
return true; //match, return true.
}
return false; // if first is not null and second is null, we define that
// array does not start with same chars.
}
/**
* Given a volume, check if it is empty, i.e there are no buckets inside it.
* We iterate in the bucket table and see if there is any key that starts with
* the volume prefix. We actually look for /volume/, since if we don't have
* the trailing slash it is possible that we might match some other volume.
* <p>
* For example, vol1 and vol122 might match, to avoid that we look for /vol1/
*
* @param volume - Volume name
* @return true if the volume is empty
*/
@Override
public boolean isVolumeEmpty(String volume) throws IOException {
String volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
// First check in bucket table cache.
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> iterator =
((TypedTable< String, OmBucketInfo>) bucketTable).cacheIterator();
while (iterator.hasNext()) {
Map.Entry< CacheKey< String >, CacheValue< OmBucketInfo > > entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
// Making sure that entry is not for delete bucket request.
if (key.startsWith(volumePrefix) && omBucketInfo != null) {
return false;
}
}
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
bucketIter = bucketTable.iterator()) {
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
if (kv != null) {
// Check the entry in db is not marked for delete. This can happen
// while entry is marked for delete, but it is not flushed to DB.
CacheValue<OmBucketInfo> cacheValue =
bucketTable.getCacheValue(new CacheKey(kv.getKey()));
if (cacheValue != null) {
if (kv.getKey().startsWith(volumePrefix)
&& cacheValue.getCacheValue() != null) {
return false; // we found at least one bucket with this volume
// prefix.
}
} else {
if (kv.getKey().startsWith(volumePrefix)) {
return false; // we found at least one bucket with this volume
// prefix.
}
}
}
}
return true;
}
/**
* Given a volume/bucket, check if it is empty, i.e there are no keys inside
* it. Prefix is /volume/bucket/, and we lookup the keyTable.
*
* @param volume - Volume name
* @param bucket - Bucket name
* @return true if the bucket is empty
*/
@Override
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
String keyPrefix = getBucketKey(volume, bucket);
// First check in key table cache.
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
((TypedTable< String, OmKeyInfo>) keyTable).cacheIterator();
while (iterator.hasNext()) {
Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
// Making sure that entry is not for delete key request.
if (key.startsWith(keyPrefix) && omKeyInfo != null) {
return false;
}
}
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
keyTable.iterator()) {
KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);
if (kv != null) {
// Check the entry in db is not marked for delete. This can happen
// while entry is marked for delete, but it is not flushed to DB.
CacheValue<OmKeyInfo> cacheValue =
keyTable.getCacheValue(new CacheKey(kv.getKey()));
if (cacheValue != null) {
if (kv.getKey().startsWith(keyPrefix)
&& cacheValue.getCacheValue() != null) {
return false; // we found at least one key with this vol/bucket
// prefix.
}
} else {
if (kv.getKey().startsWith(keyPrefix)) {
return false; // we found at least one key with this vol/bucket
// prefix.
}
}
}
}
return true;
}
/**
* {@inheritDoc}
*/
@Override
public List<OmBucketInfo> listBuckets(final String volumeName,
final String startBucket, final String bucketPrefix,
final int maxNumOfBuckets) throws IOException {
List<OmBucketInfo> result = new ArrayList<>();
if (Strings.isNullOrEmpty(volumeName)) {
throw new OMException("Volume name is required.",
ResultCodes.VOLUME_NOT_FOUND);
}
String volumeNameBytes = getVolumeKey(volumeName);
if (volumeTable.get(volumeNameBytes) == null) {
throw new OMException("Volume " + volumeName + " not found.",
ResultCodes.VOLUME_NOT_FOUND);
}
String startKey;
boolean skipStartKey = false;
if (StringUtil.isNotBlank(startBucket)) {
// if the user has specified a start key, we need to seek to that key
// and avoid that key in the response set.
startKey = getBucketKey(volumeName, startBucket);
skipStartKey = true;
} else {
// If the user has specified a prefix key, we need to get to the first
// of the keys with the prefix match. We can leverage RocksDB to do that.
// However, if the user has specified only a prefix, we cannot skip
// the first prefix key we see, the boolean skipStartKey allows us to
// skip the startkey or not depending on what patterns are specified.
startKey = getBucketKey(volumeName, bucketPrefix);
}
String seekPrefix;
if (StringUtil.isNotBlank(bucketPrefix)) {
seekPrefix = getBucketKey(volumeName, bucketPrefix);
} else {
seekPrefix = getVolumeKey(volumeName + OM_KEY_PREFIX);
}
int currentCount = 0;
// For Bucket it is full cache, so we can just iterate in-memory table
// cache.
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> iterator =
bucketTable.cacheIterator();
while (currentCount < maxNumOfBuckets && iterator.hasNext()) {
Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
// Making sure that entry in cache is not for delete bucket request.
if (omBucketInfo != null) {
if (key.equals(startKey) && skipStartKey) {
continue;
}
// We should return only the keys, whose keys match with prefix and
// the keys after the startBucket.
if (key.startsWith(seekPrefix) && key.compareTo(startKey) > 0) {
result.add(omBucketInfo);
currentCount++;
}
}
}
return result;
}
@Override
public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys) throws IOException {
List<OmKeyInfo> result = new ArrayList<>();
if (maxKeys <= 0) {
return result;
}
if (Strings.isNullOrEmpty(volumeName)) {
throw new OMException("Volume name is required.",
ResultCodes.VOLUME_NOT_FOUND);
}
if (Strings.isNullOrEmpty(bucketName)) {
throw new OMException("Bucket name is required.",
ResultCodes.BUCKET_NOT_FOUND);
}
String bucketNameBytes = getBucketKey(volumeName, bucketName);
if (getBucketTable().get(bucketNameBytes) == null) {
throw new OMException("Bucket " + bucketName + " not found.",
ResultCodes.BUCKET_NOT_FOUND);
}
String seekKey;
boolean skipStartKey = false;
if (StringUtil.isNotBlank(startKey)) {
// Seek to the specified key.
seekKey = getOzoneKey(volumeName, bucketName, startKey);
skipStartKey = true;
} else {
// This allows us to seek directly to the first key with the right prefix.
seekKey = getOzoneKey(volumeName, bucketName, keyPrefix);
}
String seekPrefix;
if (StringUtil.isNotBlank(keyPrefix)) {
seekPrefix = getOzoneKey(volumeName, bucketName, keyPrefix);
} else {
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
}
int currentCount = 0;
TreeMap<String, OmKeyInfo> cacheKeyMap = new TreeMap<>();
Set<String> deletedKeySet = new TreeSet<>();
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
keyTable.cacheIterator();
//TODO: We can avoid this iteration if table cache has stored entries in
// treemap. Currently HashMap is used in Cache. HashMap get operation is an
// constant time operation, where as for treeMap get is log(n).
// So if we move to treemap, the get operation will be affected. As get
// is frequent operation on table. So, for now in list we iterate cache map
// and construct treeMap which match with keyPrefix and are greater than or
// equal to startKey. Later we can revisit this, if list operation
// is becoming slow.
while (iterator.hasNext()) {
Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
// Making sure that entry in cache is not for delete key request.
if (omKeyInfo != null) {
if (key.startsWith(seekPrefix) && key.compareTo(seekKey) >= 0) {
cacheKeyMap.put(key, omKeyInfo);
}
} else {
deletedKeySet.add(key);
}
}
// Get maxKeys from DB if it has.
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyIter = getKeyTable().iterator()) {
KeyValue< String, OmKeyInfo > kv;
keyIter.seek(seekKey);
// we need to iterate maxKeys + 1 here because if skipStartKey is true,
// we should skip that entry and return the result.
while (currentCount < maxKeys + 1 && keyIter.hasNext()) {
kv = keyIter.next();
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
// Entry should not be marked for delete, consider only those
// entries.
if(!deletedKeySet.contains(kv.getKey())) {
cacheKeyMap.put(kv.getKey(), kv.getValue());
currentCount++;
}
} else {
// The SeekPrefix does not match any more, we can break out of the
// loop.
break;
}
}
}
// Finally DB entries and cache entries are merged, then return the count
// of maxKeys from the sorted map.
currentCount = 0;
for (Map.Entry<String, OmKeyInfo> cacheKey : cacheKeyMap.entrySet()) {
if (cacheKey.getKey().equals(seekKey) && skipStartKey) {
continue;
}
result.add(cacheKey.getValue());
currentCount++;
if (currentCount == maxKeys) {
break;
}
}
// Clear map and set.
cacheKeyMap.clear();
deletedKeySet.clear();
return result;
}
@Override
public List<OmVolumeArgs> listVolumes(String userName,
String prefix, String startKey, int maxKeys) throws IOException {
List<OmVolumeArgs> result = Lists.newArrayList();
UserVolumeInfo volumes;
if (StringUtil.isBlank(userName)) {
throw new OMException("User name is required to list Volumes.",
ResultCodes.USER_NOT_FOUND);
}
volumes = getVolumesByUser(userName);
if (volumes == null || volumes.getVolumeNamesCount() == 0) {
return result;
}
boolean startKeyFound = Strings.isNullOrEmpty(startKey);
for (String volumeName : volumes.getVolumeNamesList()) {
if (!Strings.isNullOrEmpty(prefix)) {
if (!volumeName.startsWith(prefix)) {
continue;
}
}
if (!startKeyFound && volumeName.equals(startKey)) {
startKeyFound = true;
continue;
}
if (startKeyFound && result.size() < maxKeys) {
OmVolumeArgs volumeArgs =
getVolumeTable().get(this.getVolumeKey(volumeName));
if (volumeArgs == null) {
// Could not get volume info by given volume name,
// since the volume name is loaded from db,
// this probably means om db is corrupted or some entries are
// accidentally removed.
throw new OMException("Volume info not found for " + volumeName,
ResultCodes.VOLUME_NOT_FOUND);
}
result.add(volumeArgs);
}
}
return result;
}
private UserVolumeInfo getVolumesByUser(String userNameKey)
throws OMException {
try {
UserVolumeInfo userVolInfo = getUserTable().get(userNameKey);
if (userVolInfo == null) {
// No volume found for this user, return an empty list
return UserVolumeInfo.newBuilder().build();
} else {
return userVolInfo;
}
} catch (IOException e) {
throw new OMException("Unable to get volumes info by the given user, "
+ "metadata might be corrupted", e,
ResultCodes.METADATA_ERROR);
}
}
@Override
public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
keyIter = getDeletedTable().iterator()) {
int currentCount = 0;
while (keyIter.hasNext() && currentCount < keyCount) {
KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
if (kv != null) {
RepeatedOmKeyInfo infoList = kv.getValue();
// Get block keys as a list.
for(OmKeyInfo info : infoList.getOmKeyInfoList()){
OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
List<BlockID> item = latest.getLocationList().stream()
.map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()
.setKeyName(kv.getKey())
.addAllBlockIDs(item)
.build();
keyBlocksList.add(keyBlocks);
currentCount++;
}
}
}
}
return keyBlocksList;
}
@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
// TODO: Fix the getExpiredOpenKeys, Not part of this patch.
return keyBlocksList;
}
@Override
public <KEY, VALUE> long countRowsInTable(Table<KEY, VALUE> table)
throws IOException {
long count = 0;
if (table != null) {
try (TableIterator<KEY, ? extends KeyValue<KEY, VALUE>>
keyValueTableIterator = table.iterator()) {
while (keyValueTableIterator.hasNext()) {
keyValueTableIterator.next();
count++;
}
}
}
return count;
}
@Override
public <KEY, VALUE> long countEstimatedRowsInTable(Table<KEY, VALUE> table)
throws IOException {
long count = 0;
if (table != null) {
count = table.getEstimatedKeyCount();
}
return count;
}
@Override
public List<String> getMultipartUploadKeys(
String volumeName, String bucketName, String prefix) throws IOException {
List<String> response = new ArrayList<>();
TableIterator<String, ? extends KeyValue<String, OmMultipartKeyInfo>>
iterator = getMultipartInfoTable().iterator();
String prefixKey =
OmMultipartUpload.getDbKey(volumeName, bucketName, prefix);
iterator.seek(prefixKey);
while (iterator.hasNext()) {
KeyValue<String, OmMultipartKeyInfo> entry = iterator.next();
if (entry.getKey().startsWith(prefixKey)) {
response.add(entry.getKey());
} else {
break;
}
}
return response;
}
@Override
public Table<String, S3SecretValue> getS3SecretTable() {
return s3SecretTable;
}
/**
* Update store used by subclass.
*
* @param store DB store.
*/
protected void setStore(DBStore store) {
this.store = store;
}
}