blob: 26ad1f23d358b0028c079ade9fb6d4302a682cb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.igfs.secondary.local;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.nio.file.attribute.PosixFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
import org.apache.ignite.internal.processors.igfs.IgfsDataManager;
import org.apache.ignite.internal.processors.igfs.IgfsImpl;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemBlockKey;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemIgfsFile;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemSizeVisitor;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.FileSystemResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
* Secondary file system which delegates to local file system.
*/
public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware {
/** Path that will be added to each passed path. */
private String workDir;
/** Logger. */
@SuppressWarnings("unused")
@LoggerResource
private IgniteLogger log;
/** IGFS instance. */
@SuppressWarnings("unused")
@FileSystemResource
private IgfsImpl igfs;
/**
* Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
*
* @param e Exception to check.
* @param msg Detailed error message.
* @return Appropriate exception.
*/
private IgfsException handleSecondaryFsError(IOException e, String msg) {
if (e instanceof FileNotFoundException)
return new IgfsPathNotFoundException(e);
else
return new IgfsException(msg, e);
}
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
return fileForPath(path).exists();
}
/** {@inheritDoc} */
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
File f = fileForPath(path);
if (!f.exists())
return null;
updatePropertiesIfNeeded(path, props);
return info(path);
}
/** {@inheritDoc} */
@Override public void rename(IgfsPath src, IgfsPath dest) {
File srcFile = fileForPath(src);
File destFile = fileForPath(dest);
if (!srcFile.exists())
throw new IgfsPathNotFoundException("Failed to perform rename because source path not found: " + src);
if (srcFile.isDirectory() && destFile.isFile())
throw new IgfsPathIsNotDirectoryException("Failed to perform rename because destination path is " +
"directory and source path is file [src=" + src + ", dest=" + dest + ']');
try {
if (destFile.isDirectory())
Files.move(srcFile.toPath(), destFile.toPath().resolve(srcFile.getName()));
else if (!srcFile.renameTo(destFile))
throw new IgfsException("Failed to perform rename (underlying file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to rename [src=" + src + ", dest=" + dest + ']');
}
}
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
File f = fileForPath(path);
if (!recursive)
return f.delete();
else
return deleteRecursive(f, false);
}
/**
* Delete directory recursively.
*
* @param f Directory.
* @param deleteIfExists Ignore delete errors if the file doesn't exist.
* @return {@code true} if successful.
*/
private boolean deleteRecursive(File f, boolean deleteIfExists) {
BasicFileAttributes attrs;
try {
attrs = Files.readAttributes(f.toPath(), BasicFileAttributes.class, LinkOption.NOFOLLOW_LINKS);
}
catch (IOException ignore) {
return deleteIfExists && !f.exists();
}
if (!attrs.isDirectory() || attrs.isSymbolicLink())
return f.delete() || (deleteIfExists && !f.exists());
File[] entries = f.listFiles();
if (entries != null) {
for (File entry : entries) {
boolean res = deleteRecursive(entry, true);
if (!res)
return false;
}
}
return f.delete() || (deleteIfExists && !f.exists());
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
if (!mkdirs0(fileForPath(path)))
throw new IgniteException("Failed to make directories (underlying file system returned false): " + path);
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
mkdirs(path);
updatePropertiesIfNeeded(path, props);
}
/**
* Create directories.
*
* @param dir Directory.
* @return Result.
*/
private boolean mkdirs0(@Nullable File dir) {
if (dir == null)
return true; // Nothing to create.
if (dir.exists())
// Already exists, so no-op.
return dir.isDirectory();
else {
File parentDir = dir.getParentFile();
if (!mkdirs0(parentDir)) // Create parent first.
return false;
boolean res = dir.mkdir();
if (!res)
res = dir.exists(); // Tolerate concurrent creation.
return res;
}
}
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
File[] entries = listFiles0(path);
if (F.isEmpty(entries))
return Collections.emptySet();
else {
Collection<IgfsPath> res = U.newHashSet(entries.length);
for (File entry : entries)
res.add(igfsPath(entry));
return res;
}
}
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
File[] entries = listFiles0(path);
if (F.isEmpty(entries))
return Collections.emptySet();
else {
Collection<IgfsFile> res = U.newHashSet(entries.length);
for (File entry : entries) {
IgfsFile info = info(igfsPath(entry));
if (info != null)
res.add(info);
}
return res;
}
}
/**
* Returns an array of File object. Under the specific path.
*
* @param path IGFS path.
* @return Array of File objects.
*/
@Nullable private File[] listFiles0(IgfsPath path) {
File f = fileForPath(path);
if (!f.exists())
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
else
return f.listFiles();
}
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
try {
FileInputStream in = new FileInputStream(fileForPath(path));
return new LocalFileSystemPositionedReadable(in, bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to open file for read: " + path);
}
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
return create0(path, overwrite);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
long blockSize, @Nullable Map<String, String> props) {
OutputStream os = create0(path, overwrite);
try {
updatePropertiesIfNeeded(path, props);
return os;
}
catch (Exception err) {
try {
os.close();
}
catch (IOException closeErr) {
err.addSuppressed(closeErr);
}
throw err;
}
}
/** {@inheritDoc} */
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
File file = fileForPath(path);
boolean exists = file.exists();
if (exists) {
OutputStream os = new FileOutputStream(file, true);
try {
updatePropertiesIfNeeded(path, props);
return os;
}
catch (Exception err) {
try {
os.close();
throw err;
}
catch (IOException closeErr) {
err.addSuppressed(closeErr);
throw err;
}
}
}
else {
if (create)
return create(path, bufSize, false, 0, 0, props);
else
throw new IgfsPathNotFoundException("Failed to append to file because it doesn't exist: " + path);
}
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append to file because it doesn't exist: " + path);
}
}
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
File file = fileForPath(path);
if (!file.exists())
return null;
boolean isDir = file.isDirectory();
PosixFileAttributes attrs = LocalFileSystemUtils.posixAttributes(file);
Map<String, String> props = LocalFileSystemUtils.posixAttributesToMap(attrs);
BasicFileAttributes basicAttrs = LocalFileSystemUtils.basicAttributes(file);
if (isDir) {
return new LocalFileSystemIgfsFile(path, false, true, 0,
basicAttrs.lastAccessTime().toMillis(), basicAttrs.lastModifiedTime().toMillis(), 0, props);
}
else {
return new LocalFileSystemIgfsFile(path, file.isFile(), false, 0,
basicAttrs.lastAccessTime().toMillis(), basicAttrs.lastModifiedTime().toMillis(), file.length(), props);
}
}
/** {@inheritDoc} */
@Override public long usedSpaceSize() {
Path p = fileForPath(IgfsPath.ROOT).toPath();
try {
LocalFileSystemSizeVisitor visitor = new LocalFileSystemSizeVisitor();
Files.walkFileTree(p, visitor);
return visitor.size();
}
catch (IOException e) {
throw new IgfsException("Failed to calculate used space size.", e);
}
}
/** {@inheritDoc} */
@Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
Path p = fileForPath(path).toPath();
if (!Files.exists(p))
throw new IgfsPathNotFoundException("Failed to set times (path not found): " + path);
try {
Files.getFileAttributeView(p, BasicFileAttributeView.class)
.setTimes(
(modificationTime >= 0) ? FileTime.from(modificationTime, TimeUnit.MILLISECONDS) : null,
(accessTime >= 0) ? FileTime.from(accessTime, TimeUnit.MILLISECONDS) : null,
null);
}
catch (IOException e) {
throw new IgniteException("Failed to set times for path: " + path, e);
}
}
/** {@inheritDoc} */
@Override public void start() throws IgniteException {
if (workDir != null)
workDir = new File(workDir).getAbsolutePath();
}
/** {@inheritDoc} */
@Override public void stop() throws IgniteException {
// No-op.
}
/** {@inheritDoc} */
@Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
long maxLen) throws IgniteException {
File f = fileForPath(path);
if (!f.exists())
throw new IgfsPathNotFoundException("File not found: " + path);
// Create fake block & fake affinity for blocks
long blockSize = igfs.configuration().getBlockSize();
if (maxLen <= 0)
maxLen = Long.MAX_VALUE;
assert maxLen > 0 : "maxLen : " + maxLen;
long end = start + len;
Collection<IgfsBlockLocation> blocks = new ArrayList<>((int)(len / maxLen));
IgfsDataManager data = igfs.context().data();
Collection<ClusterNode> lastNodes = null;
long lastBlockIdx = -1;
IgfsBlockLocationImpl lastBlock = null;
for (long offset = start; offset < end; ) {
long blockIdx = offset / blockSize;
// Each step is min of maxLen and end of block.
long lenStep = Math.min(
maxLen - (lastBlock != null ? lastBlock.length() : 0),
(blockIdx + 1) * blockSize - offset);
lenStep = Math.min(lenStep, end - offset);
// Create fake affinity key to map blocks of secondary filesystem to nodes.
LocalFileSystemBlockKey affKey = new LocalFileSystemBlockKey(path, blockIdx);
if (blockIdx != lastBlockIdx) {
Collection<ClusterNode> nodes = data.affinityNodes(affKey);
if (!nodes.equals(lastNodes) && lastNodes != null && lastBlock != null) {
blocks.add(lastBlock);
lastBlock = null;
}
lastNodes = nodes;
lastBlockIdx = blockIdx;
}
if (lastBlock == null)
lastBlock = new IgfsBlockLocationImpl(offset, lenStep, lastNodes);
else
lastBlock.increaseLength(lenStep);
if (lastBlock.length() == maxLen || lastBlock.start() + lastBlock.length() == end) {
blocks.add(lastBlock);
lastBlock = null;
}
offset += lenStep;
}
return blocks;
}
/**
* Get work directory.
*
* @return Work directory.
*/
@Nullable public String getWorkDirectory() {
return workDir;
}
/**
* Set work directory.
*
* @param workDir Work directory.
*/
public void setWorkDirectory(@Nullable String workDir) {
this.workDir = workDir;
}
/**
* Create file for IGFS path.
*
* @param path IGFS path.
* @return File object.
*/
private File fileForPath(IgfsPath path) {
if (workDir == null)
return new File(path.toString());
else {
if ("/".equals(path.toString()))
return new File(workDir);
else
return new File(workDir, path.toString());
}
}
/**
* Create IGFS path for file.
*
* @param f File object.
* @return IFGS path.
* @throws IgfsException If failed.
*/
private IgfsPath igfsPath(File f) throws IgfsException {
String path = f.getAbsolutePath();
if (workDir != null) {
if (!path.startsWith(workDir))
throw new IgfsException("Path is not located in the work directory [workDir=" + workDir +
", path=" + path + ']');
path = path.substring(workDir.length(), path.length());
}
return new IgfsPath(path);
}
/**
* Internal create routine.
*
* @param path Path.
* @param overwrite Overwrite flag.
* @return Output stream.
*/
private OutputStream create0(IgfsPath path, boolean overwrite) {
File file = fileForPath(path);
boolean exists = file.exists();
if (exists) {
if (!overwrite)
throw new IgfsPathAlreadyExistsException("Failed to create a file because it already exists: " + path);
}
else {
File parent = file.getParentFile();
if (!mkdirs0(parent))
throw new IgfsException("Failed to create parent directory for file (underlying file system " +
"returned false): " + path);
}
try {
return new FileOutputStream(file);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + ']');
}
}
/**
* Update path properties if needed.
*
* @param path IGFS path
* @param props Properties map.
*/
private void updatePropertiesIfNeeded(IgfsPath path, Map<String, String> props) {
if (props == null || props.isEmpty())
return;
File file = fileForPath(path);
if (!file.exists())
throw new IgfsPathNotFoundException("Failed to update properties for path: " + path);
LocalFileSystemUtils.updateProperties(file, props.get(IgfsUtils.PROP_GROUP_NAME),
props.get(IgfsUtils.PROP_PERMISSION));
}
}