blob: b2163509e3070af4942f93b82f3eaedab145fd75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.delegate;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.ignite.IgniteException;
import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
/**
* Secondary file system implementation.
*/
@SuppressWarnings("unused")
public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate {
/** The default user name. It is used if no user context is set. */
private final String dfltUsrName;
/** Factory. */
private final HadoopFileSystemFactoryDelegate factory;
/**
* Constructor.
*
* @param proxy Proxy.
*/
public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) {
assert proxy.getFileSystemFactory() != null;
dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName());
HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory();
if (factory0 == null)
factory0 = new CachingHadoopFileSystemFactory();
factory = HadoopDelegateUtils.fileSystemFactoryDelegate(getClass().getClassLoader(), factory0);
}
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
try {
return fileSystemForUser().exists(convert(path));
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
}
}
/** {@inheritDoc} */
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
final FileSystem fileSys = fileSystemForUser();
Path hadoopPath = convert(path);
try {
if (!fileSys.exists(hadoopPath))
return null;
if (props0.userName() != null || props0.groupName() != null)
fileSys.setOwner(hadoopPath, props0.userName(), props0.groupName());
if (props0.permission() != null)
fileSys.setPermission(hadoopPath, props0.permission());
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
}
return info(path);
}
/** {@inheritDoc} */
@Override public void rename(IgfsPath src, IgfsPath dest) {
// Delegate to the secondary file system.
try {
if (!fileSystemForUser().rename(convert(src), convert(dest)))
throw new IgfsException("Failed to rename (secondary file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
}
}
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
try {
return fileSystemForUser().delete(convert(path), recursive);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
}
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
try {
if (!fileSystemForUser().mkdirs(convert(path)))
throw new IgniteException("Failed to make directories [path=" + path + "]");
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
}
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
try {
if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
}
}
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
try {
FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
Collection<IgfsPath> res = new ArrayList<>(statuses.length);
for (FileStatus status : statuses)
res.add(new IgfsPath(path, status.getPath().getName()));
return res;
}
catch (FileNotFoundException ignored) {
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
}
}
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
try {
FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
Collection<IgfsFile> res = new ArrayList<>(statuses.length);
for (FileStatus s : statuses) {
IgfsEntryInfo fsInfo = s.isDirectory() ?
IgfsUtils.createDirectory(
IgniteUuid.randomUuid(),
null,
properties(s),
s.getAccessTime(),
s.getModificationTime()
) :
IgfsUtils.createFile(
IgniteUuid.randomUuid(),
(int)s.getBlockSize(),
s.getLen(),
null,
null,
false,
properties(s),
s.getAccessTime(),
s.getModificationTime()
);
res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
}
return res;
}
catch (FileNotFoundException ignored) {
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
}
}
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
try {
return fileSystemForUser().create(convert(path), overwrite);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
}
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
long blockSize, @Nullable Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
try {
return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
(short) replication, blockSize, null);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
", blockSize=" + blockSize + "]");
}
}
/** {@inheritDoc} */
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
Path hadoopPath = convert(path);
FileSystem fs = fileSystemForUser();
if (create && !fs.exists(hadoopPath))
return fs.create(hadoopPath, false, bufSize);
else
return fs.append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
}
}
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
try {
final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
if (status == null)
return null;
final Map<String, String> props = properties(status);
return new IgfsFileImpl(new IgfsFile() {
@Override public IgfsPath path() {
return path;
}
@Override public boolean isFile() {
return status.isFile();
}
@Override public boolean isDirectory() {
return status.isDirectory();
}
@Override public int blockSize() {
// By convention directory has blockSize == 0, while file has blockSize > 0:
return isDirectory() ? 0 : (int)status.getBlockSize();
}
@Override public long groupBlockSize() {
return status.getBlockSize();
}
@Override public long accessTime() {
return status.getAccessTime();
}
@Override public long modificationTime() {
return status.getModificationTime();
}
@Override public String property(String name) throws IllegalArgumentException {
String val = props.get(name);
if (val == null)
throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
return val;
}
@Nullable @Override public String property(String name, @Nullable String dfltVal) {
String val = props.get(name);
return val == null ? dfltVal : val;
}
@Override public long length() {
return status.getLen();
}
/** {@inheritDoc} */
@Override public Map<String, String> properties() {
return props;
}
}, 0);
}
catch (FileNotFoundException ignore) {
return null;
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
}
}
/** {@inheritDoc} */
@Override public long usedSpaceSize() {
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
}
}
/** {@inheritDoc} */
@Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed set times for path: " + path);
}
}
/** {@inheritDoc} */
@Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
long maxLen) throws IgniteException {
try {
BlockLocation[] hadoopBlocks = fileSystemForUser().getFileBlockLocations(convert(path), start, len);
List<IgfsBlockLocation> blks = new ArrayList<>(hadoopBlocks.length);
for (int i = 0; i < hadoopBlocks.length; ++i)
blks.add(convertBlockLocation(hadoopBlocks[i]));
return blks;
}
catch (FileNotFoundException ignored) {
return Collections.emptyList();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed affinity for path: " + path);
}
}
/** {@inheritDoc} */
@Override public void start() {
factory.start();
}
/** {@inheritDoc} */
@Override public void stop() {
factory.stop();
}
/**
* Convert IGFS path into Hadoop path.
*
* @param path IGFS path.
* @return Hadoop path.
*/
private Path convert(IgfsPath path) {
URI uri = fileSystemForUser().getUri();
return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
}
/**
* Convert IGFS affinity block location into Hadoop affinity block location.
*
* @param block IGFS affinity block location.
* @return Hadoop affinity block location.
*/
private IgfsBlockLocation convertBlockLocation(BlockLocation block) {
try {
String[] names = block.getNames();
String[] hosts = block.getHosts();
return new IgfsBlockLocationImpl(
block.getOffset(), block.getLength(),
Arrays.asList(names), Arrays.asList(hosts));
} catch (IOException e) {
throw handleSecondaryFsError(e, "Failed convert block location: " + block);
}
}
/**
* Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
*
* @param e Exception to check.
* @param detailMsg Detailed error message.
* @return Appropriate exception.
*/
private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
return cast(detailMsg, e);
}
/**
* Cast IO exception to IGFS exception.
*
* @param msg Error message.
* @param e IO exception.
* @return IGFS exception.
*/
public static IgfsException cast(String msg, IOException e) {
if (e instanceof FileNotFoundException)
return new IgfsPathNotFoundException(e);
else if (e instanceof ParentNotDirectoryException)
return new IgfsParentNotDirectoryException(msg, e);
else if (e instanceof PathIsNotEmptyDirectoryException)
return new IgfsDirectoryNotEmptyException(e);
else if (e instanceof PathExistsException)
return new IgfsPathAlreadyExistsException(msg, e);
else
return new IgfsException(msg, e);
}
/**
* Convert Hadoop FileStatus properties to map.
*
* @param status File status.
* @return IGFS attributes.
*/
private static Map<String, String> properties(FileStatus status) {
FsPermission perm = status.getPermission();
if (perm == null)
perm = FsPermission.getDefault();
HashMap<String, String> res = new HashMap<>(3);
res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
return res;
}
/**
* Gets the FileSystem for the current context user.
* @return the FileSystem instance, never null.
*/
private FileSystem fileSystemForUser() {
String user = IgfsUserContext.currentUser();
if (F.isEmpty(user))
user = IgfsUtils.fixUserName(dfltUsrName);
assert !F.isEmpty(user);
try {
return (FileSystem)factory.get(user);
}
catch (IOException ioe) {
throw new IgniteException(ioe);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this);
}
}