blob: fb73b41694402a46005bd2ba4f06b5e5ac10b396 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import java.io.File;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.Map;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* This class is used for block maps stored as text files,
* with a specified delimiter.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class TextFileRegionAliasMap
extends BlockAliasMap<FileRegion> implements Configurable {
private Configuration conf;
private ReaderOptions readerOpts = TextReader.defaults();
private WriterOptions writerOpts = TextWriter.defaults();
public static final Logger LOG =
LoggerFactory.getLogger(TextFileRegionAliasMap.class);
@Override
public void setConf(Configuration conf) {
readerOpts.setConf(conf);
writerOpts.setConf(conf);
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
if (null == opts) {
opts = readerOpts;
}
if (!(opts instanceof ReaderOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
ReaderOptions o = (ReaderOptions) opts;
Configuration readerConf = (null == o.getConf())
? new Configuration()
: o.getConf();
return createReader(o.file, o.delim, readerConf, blockPoolID);
}
@VisibleForTesting
TextReader createReader(Path file, String delim, Configuration cfg,
String blockPoolID) throws IOException {
FileSystem fs = file.getFileSystem(cfg);
if (fs instanceof LocalFileSystem) {
fs = ((LocalFileSystem)fs).getRaw();
}
CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodec(file);
String filename = fileNameFromBlockPoolID(blockPoolID);
if (codec != null) {
filename = filename + codec.getDefaultExtension();
}
Path bpidFilePath = new Path(file.getParent(), filename);
return new TextReader(fs, bpidFilePath, codec, delim);
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (null == opts) {
opts = writerOpts;
}
if (!(opts instanceof WriterOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
WriterOptions o = (WriterOptions) opts;
Configuration cfg = (null == o.getConf())
? new Configuration()
: o.getConf();
String baseName = fileNameFromBlockPoolID(blockPoolID);
Path blocksFile = new Path(o.dir, baseName);
if (o.codec != null) {
CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodecByName(o.codec);
blocksFile = new Path(o.dir, baseName + codec.getDefaultExtension());
return createWriter(blocksFile, codec, o.delim, cfg);
}
return createWriter(blocksFile, null, o.delim, conf);
}
@VisibleForTesting
TextWriter createWriter(Path file, CompressionCodec codec, String delim,
Configuration cfg) throws IOException {
FileSystem fs = file.getFileSystem(cfg);
if (fs instanceof LocalFileSystem) {
fs = ((LocalFileSystem)fs).getRaw();
}
OutputStream tmp = fs.create(file);
java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
(null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
return new TextWriter(out, delim);
}
/**
* Class specifying reader options for the {@link TextFileRegionAliasMap}.
*/
public static class ReaderOptions
implements TextReader.Options, Configurable {
private Configuration conf;
private String delim =
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
private Path file = new Path(
new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE_DEFAULT)
.toURI().toString());
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String tmpfile =
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE_DEFAULT);
file = new Path(tmpfile);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
LOG.info("TextFileRegionAliasMap: read path {}", tmpfile);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public ReaderOptions filename(Path file) {
this.file = file;
return this;
}
@Override
public ReaderOptions delimiter(String delim) {
this.delim = delim;
return this;
}
}
/**
* Class specifying writer options for the {@link TextFileRegionAliasMap}.
*/
public static class WriterOptions
implements TextWriter.Options, Configurable {
private Configuration conf;
private String codec = null;
private Path dir =
new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR_DEFAULT);
private String delim =
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String tmpDir = conf.get(
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR, dir.toString());
dir = new Path(tmpDir);
codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public WriterOptions dirName(Path dir) {
this.dir = dir;
return this;
}
public String getCodec() {
return codec;
}
public Path getDir() {
return dir;
}
@Override
public WriterOptions codec(String codec) {
this.codec = codec;
return this;
}
@Override
public WriterOptions delimiter(String delim) {
this.delim = delim;
return this;
}
}
/**
* This class is used as a reader for block maps which
* are stored as delimited text files.
*/
public static class TextReader extends Reader<FileRegion> {
/**
* Options for {@link TextReader}.
*/
public interface Options extends Reader.Options {
Options filename(Path file);
Options delimiter(String delim);
}
public static ReaderOptions defaults() {
return new ReaderOptions();
}
private final Path file;
private final String delim;
private final FileSystem fs;
private final CompressionCodec codec;
private final Map<FRIterator, BufferedReader> iterators;
private final String blockPoolID;
protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
String delim) {
this(fs, file, codec, delim,
new IdentityHashMap<FRIterator, BufferedReader>());
}
TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
Map<FRIterator, BufferedReader> iterators) {
this.fs = fs;
this.file = file;
this.codec = codec;
this.delim = delim;
this.iterators = Collections.synchronizedMap(iterators);
this.blockPoolID = blockPoolIDFromFileName(file);
}
@Override
public Optional<FileRegion> resolve(Block ident) throws IOException {
// consider layering index w/ composable format
Iterator<FileRegion> i = iterator();
try {
while (i.hasNext()) {
FileRegion f = i.next();
if (f.getBlock().equals(ident)) {
return Optional.of(f);
}
}
} finally {
BufferedReader r = iterators.remove(i);
if (r != null) {
// null on last element
r.close();
}
}
return Optional.empty();
}
class FRIterator implements Iterator<FileRegion> {
private FileRegion pending;
@Override
public boolean hasNext() {
return pending != null;
}
@Override
public FileRegion next() {
if (null == pending) {
throw new NoSuchElementException();
}
FileRegion ret = pending;
try {
pending = nextInternal(this);
} catch (IOException e) {
throw new RuntimeException(e);
}
return ret;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
BufferedReader r = iterators.get(i);
if (null == r) {
throw new IllegalStateException();
}
String line = r.readLine();
if (null == line) {
iterators.remove(i);
return null;
}
String[] f = line.split(delim);
if (f.length != 5 && f.length != 6) {
throw new IOException("Invalid line: " + line);
}
byte[] nonce = new byte[0];
if (f.length == 6) {
nonce = Base64.getDecoder().decode(f[5]);
}
return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
Long.parseLong(f[2]), Long.parseLong(f[3]), Long.parseLong(f[4]),
nonce);
}
public InputStream createStream() throws IOException {
InputStream i = fs.open(file);
if (codec != null) {
i = codec.createInputStream(i);
}
return i;
}
@Override
public Iterator<FileRegion> iterator() {
FRIterator i = new FRIterator();
try {
BufferedReader r =
new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
iterators.put(i, r);
i.pending = nextInternal(i);
} catch (IOException e) {
iterators.remove(i);
throw new RuntimeException(e);
}
return i;
}
@Override
public void close() throws IOException {
ArrayList<IOException> ex = new ArrayList<>();
synchronized (iterators) {
for (Iterator<BufferedReader> i = iterators.values().iterator();
i.hasNext();) {
try {
BufferedReader r = i.next();
r.close();
} catch (IOException e) {
ex.add(e);
} finally {
i.remove();
}
}
iterators.clear();
}
if (!ex.isEmpty()) {
throw MultipleIOException.createIOException(ex);
}
}
}
/**
* This class is used as a writer for block maps which
* are stored as delimited text files.
*/
public static class TextWriter extends Writer<FileRegion> {
/**
* Interface for Writer options.
*/
public interface Options extends Writer.Options {
Options codec(String codec);
Options dirName(Path dir);
Options delimiter(String delim);
}
public static WriterOptions defaults() {
return new WriterOptions();
}
private final String delim;
private final java.io.Writer out;
public TextWriter(java.io.Writer out, String delim) {
this.out = out;
this.delim = delim;
}
@Override
public void store(FileRegion token) throws IOException {
final Block block = token.getBlock();
final ProvidedStorageLocation psl = token.getProvidedStorageLocation();
out.append(String.valueOf(block.getBlockId())).append(delim);
out.append(psl.getPath().toString()).append(delim);
out.append(Long.toString(psl.getOffset())).append(delim);
out.append(Long.toString(psl.getLength())).append(delim);
out.append(Long.toString(block.getGenerationStamp()));
if (psl.getNonce().length > 0) {
out.append(delim)
.append(Base64.getEncoder().encodeToString(psl.getNonce()));
}
out.append("\n");
}
@Override
public void remove(Block block) throws IOException {
throw new RuntimeException("TextFileWriter does not support " +
"block removal");
}
@Override
public void close() throws IOException {
out.close();
}
}
@Override
public void refresh() throws IOException {
throw new UnsupportedOperationException(
"Refresh not supported by " + getClass());
}
@Override
public void close() throws IOException {
// nothing to do;
}
@VisibleForTesting
public static String blockPoolIDFromFileName(Path file) {
if (file == null) {
return "";
}
String fileName = file.getName();
return fileName.substring("blocks_".length()).split("\\.")[0];
}
@VisibleForTesting
public static String fileNameFromBlockPoolID(String blockPoolID) {
return "blocks_" + blockPoolID + ".csv";
}
}