blob: a38e547523de2ba9cf3e7c1b4f5c7ca0e2f4c474 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IO.FileSystem.Hadoop
{
/// <summary>
/// Implements the IFileSystem interface for HDFS using external commands.
/// </summary>
/// <remarks>
/// Note that operations with this class are enormously slow. If you can, use a more native way to access the file system
/// in question.
/// </remarks>
/// <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html">
/// FileSystemShell</a>
internal sealed class HadoopFileSystem : IFileSystem
{
private static readonly Logger Logger = Logger.GetLogger(typeof(HadoopFileSystem));
private static readonly Regex NoSuchFileOrDirectoryRegEx = new Regex("^ls: `.*': No such file or directory");
private static readonly Regex LsFirstLineRegex = new Regex("^Found .* items");
private readonly HdfsCommandRunner _commandRunner;
private readonly string _uriPrefix;
private const string PrefixTemplate = "{0}://{1}/";
[Inject]
private HadoopFileSystem(HdfsCommandRunner commandRunner)
{
_commandRunner = commandRunner;
_uriPrefix = GetUriPrefix();
}
/// <summary>
/// Create Uri from a given file name
/// If the path already contains prefix, use it directly and verify the prefix after it is created.
/// Otherwise add the prefix in fron of the relative path.
/// If path is null or the prefix doesn't match the prefix in the FileSystem, throw ArgumentException
/// </summary>
/// <param name="path"></param>
/// <returns></returns>
public Uri CreateUriForPath(string path)
{
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "CreateUriForPath with path: {0}, _uriPrefix: {1}.", path, _uriPrefix));
if (path == null)
{
throw new ArgumentException("null path passed in CreateUriForPath");
}
Uri uri;
try
{
uri = new Uri(path);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Uri {0} created in CreateUriForPath.", uri.AbsolutePath));
}
catch (UriFormatException)
{
uri = new Uri(_uriPrefix + path);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Uri {0} created in CreateUriForPath with prefix added.", uri.AbsolutePath));
}
return uri;
}
/// <summary>
/// Not implemented by this IFileSystem.
/// </summary>
/// <param name="remoteFileUri"></param>
/// <returns></returns>
public FileStatus GetFileStatus(Uri remoteFileUri)
{
throw new NotImplementedException("GetFileStatus() is not implemented for HadoopFileSystem");
}
/// <summary>
/// Not implemented by this IFileSystem.
/// </summary>
/// <param name="fileUri"></param>
/// <returns></returns>
public Stream Open(Uri fileUri)
{
throw new NotImplementedException(
"Open() is not supported by HadoopFileSystem. Use CopyToLocal and open the local file instead.");
}
/// <summary>
/// Not implemented by this IFileSystem.
/// </summary>
/// <param name="fileUri"></param>
/// <returns></returns>
public Stream Create(Uri fileUri)
{
throw new NotImplementedException(
"Create() is not supported by HadoopFileSystem. Create a local file and use CopyFromLocal instead.");
}
public void Delete(Uri fileUri)
{
// Delete the file via the hdfs command line.
_commandRunner.Run("dfs -rm " + fileUri.AbsolutePath);
}
public bool Exists(Uri fileUri)
{
// This determines the existence of a file based on the 'ls' command.
// Ideally, we'd use the 'test' command's return value, but we did not find a way to access that.
return
_commandRunner.Run("dfs -ls " + fileUri.AbsolutePath).StdErr
.All(line => !NoSuchFileOrDirectoryRegEx.IsMatch(line));
}
public void Copy(Uri sourceUri, Uri destinationUri)
{
_commandRunner.Run("dfs -cp " + sourceUri.AbsolutePath + " " + destinationUri.AbsolutePath);
}
public void CopyToLocal(Uri remoteFileUri, string localName)
{
_commandRunner.Run("dfs -get " + remoteFileUri.AbsolutePath + " " + localName);
}
public void CopyFromLocal(string localFileName, Uri remoteFileUri)
{
_commandRunner.Run("dfs -put " + localFileName + " " + remoteFileUri.AbsolutePath);
}
public void CreateDirectory(Uri directoryUri)
{
_commandRunner.Run("dfs -mkdir " + directoryUri.AbsolutePath);
}
public void DeleteDirectory(Uri directoryUri)
{
_commandRunner.Run("dfs -rmdir " + directoryUri.AbsolutePath);
}
public IEnumerable<Uri> GetChildren(Uri directoryUri)
{
return _commandRunner.Run("dfs -ls " + directoryUri.AbsolutePath)
.StdOut.Where(line => !LsFirstLineRegex.IsMatch(line))
.Select(line => line.Split())
.Select(x => new Uri(x[x.Length - 1]));
}
private string GetUriPrefix()
{
return _commandRunner.Run("getconf -confKey fs.default.name").StdOut.First();
}
}
}