blob: 39392180a03a98005627619a0ede971d0668826c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IO.FileSystem.Hadoop
{
/// <summary>
/// Helper class to execute commands
/// </summary>
internal sealed class HdfsCommandRunner
{
/// <summary>
/// The name of the <code>hdfs</code> command.
/// </summary>
private const string HdfsCommandName = "hdfs.cmd";
/// <summary>
/// The folder within <code>HadoopHome</code> that contains the <code>hdfs</code> command.
/// </summary>
private const string BinFolderName = "bin";
/// <summary>
/// The name of the HADOOP_HOME environment variable.
/// </summary>
private const string HadoopHomeEnvironmentVariableName = "HADOOP_HOME";
private static readonly Logger Logger = Logger.GetLogger(typeof(HdfsCommandRunner));
/// <summary>
/// Path to hdfs.cmd
/// </summary>
private readonly string _hdfsCommandPath;
/// <summary>
/// The number of retries on HDFS commands.
/// </summary>
private readonly int _numberOfRetries;
/// <summary>
/// The timeout on each of the retries.
/// </summary>
private readonly int _timeOutInMilliSeconds;
/// <summary>
///
/// </summary>
/// <param name="numberOfRetries"></param>
/// <param name="timeOutInMilliSeconds"></param>
/// <param name="hadoopHome"></param>
/// <exception cref="FileNotFoundException">If the hdfs command can't be found.</exception>
[Inject]
private HdfsCommandRunner([Parameter(typeof(NumberOfRetries))] int numberOfRetries,
[Parameter(typeof(CommandTimeOut))] int timeOutInMilliSeconds,
[Parameter(typeof(HadoopHome))] string hadoopHome)
{
_numberOfRetries = numberOfRetries;
_timeOutInMilliSeconds = timeOutInMilliSeconds;
if (!PathUtilities.AreNormalizedEquals(hadoopHome, HadoopHome.DefaultValue))
{
// The user provided a Hadoop Home folder.
if (Directory.Exists(hadoopHome))
{
// The user provided folder does exist.
_hdfsCommandPath = GetFullPathToHdfsCommand(hadoopHome);
}
else
{
// The user provided folder does not exist. Try the environment variable.
Logger.Log(Level.Warning,
"The provided hadoop home folder {0} doesn't exist, trying environment variable {1} instead",
hadoopHome, HadoopHomeEnvironmentVariableName);
_hdfsCommandPath = GetFullPathToHdfsCommandBasedOnEnvironmentVariable();
}
}
else
{
// The user did not provide a Hadoop Home folder. Use the Environment variable.
_hdfsCommandPath = GetFullPathToHdfsCommandBasedOnEnvironmentVariable();
}
// Make sure we found the command.
if (!File.Exists(_hdfsCommandPath))
{
throw new FileNotFoundException("HDFS Command not found", _hdfsCommandPath);
}
}
internal CommandResult Run(string hdfsCommandLineArguments)
{
var processStartInfo = new ProcessStartInfo
{
FileName = _hdfsCommandPath,
Arguments = hdfsCommandLineArguments,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true
};
for (var attemptNumber = 0; attemptNumber < _numberOfRetries; ++attemptNumber)
{
var processName = string.Format("HDFS_Attempt_{0}_of_{1}", attemptNumber, _numberOfRetries);
var result = RunAttempt(processStartInfo, _timeOutInMilliSeconds, processName);
if (null != result)
{
LogCommandOutput(result);
return result;
}
}
// If we reached here, we ran out of retries.
throw new Exception(
string.Format("HDFS Cmd {0} {1} could not be executed in the specified timeout & retry settings",
_hdfsCommandPath, hdfsCommandLineArguments));
}
/// <summary>
/// Utility method that constructs the full absolute path to the <code>hdfs</code> command.
/// </summary>
/// <param name="hadoopHome"></param>
/// <returns></returns>
private static string GetFullPathToHdfsCommand(string hadoopHome)
{
return Path.Combine(Path.GetFullPath(hadoopHome), BinFolderName, HdfsCommandName);
}
/// <summary>
/// Constructs the path to the HDFS binary based on the HADOOP_HOME environment variable.
/// </summary>
/// <returns></returns>
private static string GetFullPathToHdfsCommandBasedOnEnvironmentVariable()
{
var hadoopHomeFromEnv = Environment.GetEnvironmentVariable(HadoopHomeEnvironmentVariableName);
Logger.Log(Level.Verbose, "{0} evaluated to {1}.", HadoopHomeEnvironmentVariableName, hadoopHomeFromEnv);
if (null == hadoopHomeFromEnv)
{
throw new Exception(HadoopHomeEnvironmentVariableName +
" not set and no path to the hadoop installation provided.");
}
return GetFullPathToHdfsCommand(hadoopHomeFromEnv);
}
/// <summary>
/// Helper method to log the command result.
/// </summary>
/// <param name="result"></param>
private static void LogCommandOutput(CommandResult result)
{
using (var messageBuilder = new StringWriter())
{
messageBuilder.WriteLine("OUTPUT:");
messageBuilder.WriteLine("----------------------------------------");
foreach (var stdOut in result.StdOut)
{
messageBuilder.WriteLine("Out: " + stdOut);
}
messageBuilder.WriteLine("----------------------------------------");
foreach (var stdErr in result.StdErr)
{
messageBuilder.WriteLine("Err: " + stdErr);
}
messageBuilder.WriteLine("----------------------------------------");
Logger.Log(Level.Verbose, messageBuilder.ToString());
}
}
/// <summary>
/// Attempts to run a process with a timeout.
/// </summary>
/// <returns>The result of the attempt or null in case of timeout.</returns>
/// <param name="processStartInfo">The process start information.</param>
/// <param name="timeOutInMilliSeconds">Timeout for the process.</param>
/// <param name="processName">A human readable name used for logging purposes.</param>
private static CommandResult RunAttempt(ProcessStartInfo processStartInfo, int timeOutInMilliSeconds,
string processName)
{
// Setup the process.
var outList = new List<string>();
var errList = new List<string>();
processStartInfo.RedirectStandardError = true;
processStartInfo.RedirectStandardOutput = true;
var process = new Process
{
StartInfo = processStartInfo
};
process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e)
{
if (!string.IsNullOrWhiteSpace(e.Data))
{
outList.Add(e.Data.Trim());
}
};
process.ErrorDataReceived += delegate(object sender, DataReceivedEventArgs e)
{
if (!string.IsNullOrWhiteSpace(e.Data))
{
errList.Add(e.Data.Trim());
}
};
// Start it
process.Start();
process.BeginErrorReadLine();
process.BeginOutputReadLine();
Logger.Log(Level.Verbose, "Waiting for {0}ms for process `{1}` to finish", timeOutInMilliSeconds,
processName);
// Deal with timeouts
process.WaitForExit(timeOutInMilliSeconds);
if (process.HasExited)
{
// The happy path: Assemble an output
return new CommandResult(outList, errList, process.ExitCode);
}
// If we didn't return above, the process timed out.
Logger.Log(Level.Info, "The process `{0}` took longer than {1}ms to exit. Killing it.", processName,
timeOutInMilliSeconds);
process.Kill();
process.WaitForExit();
Logger.Log(Level.Info, "Killed process `{0}`.", processName);
return null;
}
}
}