blob: 81756e1ca184e7c9203ecc66b219dfb816210fcf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import static org.fusesource.leveldbjni.JniDBFactory.factory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromBlockBytes;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromProvidedStorageLocationBytes;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.toProtoBufBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A LevelDB based implementation of {@link BlockAliasMap}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class LevelDBFileRegionAliasMap
extends BlockAliasMap<FileRegion> implements Configurable {
private Configuration conf;
private LevelDBOptions opts = new LevelDBOptions();
public static final Logger LOG =
LoggerFactory.getLogger(LevelDBFileRegionAliasMap.class);
@Override
public void setConf(Configuration conf) {
opts.setConf(conf);
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
if (null == opts) {
opts = this.opts;
}
if (!(opts instanceof LevelDBOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBReader(
createDB(o.levelDBPath, false, blockPoolID));
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (null == opts) {
opts = this.opts;
}
if (!(opts instanceof LevelDBOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBWriter(
createDB(o.levelDBPath, true, blockPoolID));
}
private static DB createDB(String levelDBPath, boolean createIfMissing,
String blockPoolID) throws IOException {
if (levelDBPath == null || levelDBPath.length() == 0) {
throw new IllegalArgumentException(
"A valid path needs to be specified for "
+ LevelDBFileRegionAliasMap.class + " using the parameter "
+ DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
}
org.iq80.leveldb.Options options = new org.iq80.leveldb.Options();
options.createIfMissing(createIfMissing);
File dbFile;
if (blockPoolID != null) {
dbFile = new File(levelDBPath, blockPoolID);
} else {
dbFile = new File(levelDBPath);
}
if (createIfMissing && !dbFile.exists()) {
if (!dbFile.mkdirs()) {
throw new IOException("Unable to create " + dbFile);
}
}
return factory.open(dbFile, options);
}
@Override
public void refresh() throws IOException {
}
@Override
public void close() throws IOException {
// Do nothing.
}
/**
* Class specifying reader options for the {@link LevelDBFileRegionAliasMap}.
*/
public static class LevelDBOptions implements LevelDBReader.Options,
LevelDBWriter.Options, Configurable {
private Configuration conf;
private String levelDBPath;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.levelDBPath = conf.get(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public LevelDBOptions filename(String levelDBPath) {
this.levelDBPath = levelDBPath;
return this;
}
}
/**
* This class is used as a reader for block maps which
* are stored as LevelDB files.
*/
public static class LevelDBReader extends Reader<FileRegion> {
/**
* Options for {@link LevelDBReader}.
*/
public interface Options extends Reader.Options {
Options filename(String levelDBPath);
}
private DB db;
LevelDBReader(DB db) {
this.db = db;
}
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
if (db == null) {
return Optional.empty();
}
// consider layering index w/ composable format
byte[] key = toProtoBufBytes(block);
byte[] value = db.get(key);
ProvidedStorageLocation psl = fromProvidedStorageLocationBytes(value);
return Optional.of(new FileRegion(block, psl));
}
static class FRIterator implements Iterator<FileRegion> {
private final DBIterator internal;
FRIterator(DBIterator internal) {
this.internal = internal;
}
@Override
public boolean hasNext() {
return internal.hasNext();
}
@Override
public FileRegion next() {
Map.Entry<byte[], byte[]> entry = internal.next();
if (entry == null) {
return null;
}
try {
Block block = fromBlockBytes(entry.getKey());
ProvidedStorageLocation psl =
fromProvidedStorageLocationBytes(entry.getValue());
return new FileRegion(block, psl);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public Iterator<FileRegion> iterator() {
if (db == null) {
return null;
}
DBIterator iterator = db.iterator();
iterator.seekToFirst();
return new FRIterator(iterator);
}
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
}
}
/**
* This class is used as a writer for block maps which
* are stored as LevelDB files.
*/
public static class LevelDBWriter extends Writer<FileRegion> {
/**
* Interface for Writer options.
*/
public interface Options extends Writer.Options {
Options filename(String levelDBPath);
}
private final DB db;
LevelDBWriter(DB db) {
this.db = db;
}
@Override
public void store(FileRegion token) throws IOException {
byte[] key = toProtoBufBytes(token.getBlock());
byte[] value = toProtoBufBytes(token.getProvidedStorageLocation());
db.put(key, value);
}
@Override
public void remove(Block block) throws IOException {
byte[] key = toProtoBufBytes(block);
db.delete(key);
}
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
}
}
}