blob: fe64f175770acb46e3b7af118d6959ebf30c770a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.blobstore;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.regex.Matcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.storm.blobstore.BlobStoreFile;
import org.apache.storm.generated.SettableBlobMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HdfsBlobStoreFile extends BlobStoreFile {
// files are world-wide readable and owner writable
public static final FsPermission BLOBSTORE_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class);
private final String key;
private final boolean isTmp;
private final Path path;
private final boolean mustBeNew;
private final Configuration hadoopConf;
private final FileSystem fileSystem;
private SettableBlobMeta settableBlobMeta;
public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
if (BLOBSTORE_DATA_FILE.equals(name)) {
isTmp = false;
} else {
Matcher m = TMP_NAME_PATTERN.matcher(name);
if (!m.matches()) {
throw new IllegalArgumentException("File name does not match '" + name + "' !~ " + TMP_NAME_PATTERN);
}
isTmp = true;
}
hadoopConf = hconf;
key = base.getName();
path = new Path(base, name);
mustBeNew = false;
try {
fileSystem = path.getFileSystem(hadoopConf);
} catch (IOException e) {
throw new RuntimeException("Error getting filesystem for path: " + path, e);
}
}
public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) {
key = base.getName();
hadoopConf = hconf;
this.isTmp = isTmp;
this.mustBeNew = mustBeNew;
if (this.isTmp) {
path = new Path(base, System.currentTimeMillis() + TMP_EXT);
} else {
path = new Path(base, BLOBSTORE_DATA_FILE);
}
try {
fileSystem = path.getFileSystem(hadoopConf);
} catch (IOException e) {
throw new RuntimeException("Error getting filesystem for path: " + path, e);
}
}
@Override
public void delete() throws IOException {
fileSystem.delete(path, true);
}
@Override
public boolean isTmp() {
return isTmp;
}
@Override
public String getKey() {
return key;
}
@Override
public long getModTime() throws IOException {
return fileSystem.getFileStatus(path).getModificationTime();
}
private void checkIsNotTmp() {
if (!isTmp()) {
throw new IllegalStateException("Can only operate on a temporary blobstore file.");
}
}
private void checkIsTmp() {
if (isTmp()) {
throw new IllegalStateException("Cannot operate on a temporary blobstore file.");
}
}
@Override
public InputStream getInputStream() throws IOException {
checkIsTmp();
return fileSystem.open(path);
}
@Override
public OutputStream getOutputStream() throws IOException {
checkIsNotTmp();
OutputStream out = null;
FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
try {
out = fileSystem.create(path, (short) this.getMetadata().get_replication_factor());
fileSystem.setPermission(path, fileperms);
fileSystem.setReplication(path, (short) this.getMetadata().get_replication_factor());
} catch (IOException e) {
//Try to create the parent directory, may not work
FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
if (!fileSystem.mkdirs(path.getParent(), dirperms)) {
LOG.warn("error creating parent dir: " + path.getParent());
}
if (!fileSystem.getFileStatus(path.getParent()).getPermission().equals(dirperms)) {
LOG.warn("Directory {} created with unexpected permission {}.Set permission {} for this directory.",
path.getParent(), fileSystem.getFileStatus(path.getParent()).getPermission(), dirperms);
fileSystem.setPermission(path.getParent(), dirperms);
}
out = fileSystem.create(path, (short) this.getMetadata().get_replication_factor());
fileSystem.setPermission(path, dirperms);
fileSystem.setReplication(path, (short) this.getMetadata().get_replication_factor());
}
if (out == null) {
throw new IOException("Error in creating: " + path);
}
return out;
}
@Override
public void commit() throws IOException {
checkIsNotTmp();
// FileContext supports atomic rename, whereas FileSystem doesn't
FileContext fc = FileContext.getFileContext(hadoopConf);
Path dest = new Path(path.getParent(), BLOBSTORE_DATA_FILE);
if (mustBeNew) {
fc.rename(path, dest);
} else {
fc.rename(path, dest, Options.Rename.OVERWRITE);
}
// Note, we could add support for setting the replication factor
}
@Override
public void cancel() throws IOException {
checkIsNotTmp();
delete();
}
@Override
public String toString() {
return path + ":" + (isTmp ? "tmp" : BlobStoreFile.BLOBSTORE_DATA_FILE) + ":" + key;
}
@Override
public long getFileLength() throws IOException {
return fileSystem.getFileStatus(path).getLen();
}
@Override
public SettableBlobMeta getMetadata() {
return settableBlobMeta;
}
@Override
public void setMetadata(SettableBlobMeta meta) {
this.settableBlobMeta = meta;
}
}