blob: bbfeddef106c598d8379ced085ef0605c4b2f380 [file] [log] [blame]
#ifndef __HDFS_HPP__
#define __HDFS_HPP__
#include <sstream>
#include <stout/check.hpp>
#include <stout/error.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
// TODO(benh): We should get the hostname:port (or ip:port) of the
// server via:
//
// hadoop dfsadmin -report | grep Name: | awk '{ print $2 }'
//
// The advantage of doing this is then we can explicitly use the
// 'hdfs://hostname' prefix when we're trying to do copies to avoid
// silent failures when HDFS is down and the tools just copies
// locally.
//
// Note that if HDFS is not on port 9000 then we'll also need to do an
// HTTP GET on hostname:port and grab the information in the
// <title>...</title> (this is the best hack I can think of to get
// 'fs.default.name' given the tools available).
struct HDFS
{
// Look for `hadoop' first where proposed, otherwise, look for
// HADOOP_HOME, otherwise, assume it's on the PATH.
explicit HDFS(const std::string& _hadoop)
: hadoop(os::exists(_hadoop)
? _hadoop
: (os::hasenv("HADOOP_HOME")
? path::join(os::getenv("HADOOP_HOME"), "bin/hadoop")
: "hadoop")) {}
// Look for `hadoop' in HADOOP_HOME or assume it's on the PATH.
HDFS()
: hadoop(os::hasenv("HADOOP_HOME")
? path::join(os::getenv("HADOOP_HOME"), "bin/hadoop")
: "hadoop") {}
Try<bool> exists(std::string path)
{
// Make sure 'path' starts with a '/'.
path = path::join("", path);
Try<std::string> command = strings::format(
"%s fs -test -e '%s'", hadoop, path);
CHECK_SOME(command);
Try<int> status = os::shell(NULL, command.get() + " 2>&1");
if (status.isError()) {
return Error(status.error());
}
return status.get() == 0;
}
Try<Nothing> rm(std::string path)
{
// Make sure 'to' starts with a '/'.
path = path::join("", path);
Try<std::string> command = strings::format(
"%s fs -rm '%s'", hadoop, path);
CHECK_SOME(command);
std::ostringstream output;
Try<int> status = os::shell(&output, command.get() + " 2>&1");
if (status.isError()) {
return Error(status.error());
} else if (status.get() != 0) {
return Error(command.get() + "\n" + output.str());
}
return Nothing();
}
Try<Nothing> copyFromLocal(
const std::string& from,
std::string to)
{
if (!os::exists(from)) {
return Error("Failed to find " + from);
}
// Make sure 'to' starts with a '/'.
to = path::join("", to);
// Copy to HDFS.
Try<std::string> command = strings::format(
"%s fs -copyFromLocal '%s' '%s'", hadoop, from, to);
CHECK_SOME(command);
std::ostringstream output;
Try<int> status = os::shell(&output, command.get() + " 2>&1");
if (status.isError()) {
return Error(status.error());
} else if (status.get() != 0) {
return Error(command.get() + "\n" + output.str());
}
return Nothing();
}
Try<Nothing> copyToLocal(
const std::string& from,
const std::string& to)
{
// Copy from HDFS.
Try<std::string> command = strings::format(
"%s fs -copyToLocal '%s' '%s'", hadoop, from, to);
CHECK_SOME(command);
std::ostringstream output;
Try<int> status = os::shell(&output, command.get() + " 2>&1");
if (status.isError()) {
return Error(status.error());
} else if (status.get() != 0) {
return Error(command.get() + "\n" + output.str());
}
return Nothing();
}
private:
const std::string hadoop;
};
#endif // __HDFS_HPP__