blob: 18f17231b92b84d0b0e4e15837d0e44ce8758cdf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
#ifndef __HDFS_HPP__
#define __HDFS_HPP__
#include <sstream>
#include <vector>
#include <stout/check.hpp>
#include <stout/error.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
// TODO(benh): We should get the hostname:port (or ip:port) of the
// server via:
//
// hadoop dfsadmin -report | grep Name: | awk '{ print $2 }'
//
// The advantage of doing this is then we can explicitly use the
// 'hdfs://hostname' prefix when we're trying to do copies to avoid
// silent failures when HDFS is down and the tools just copies
// locally.
//
// Note that if HDFS is not on port 9000 then we'll also need to do an
// HTTP GET on hostname:port and grab the information in the
// <title>...</title> (this is the best hack I can think of to get
// 'fs.default.name' given the tools available).
struct HDFS
{
// Look for `hadoop' first where proposed, otherwise, look for
// HADOOP_HOME, otherwise, assume it's on the PATH.
explicit HDFS(const std::string& _hadoop)
: hadoop(os::exists(_hadoop)
? _hadoop
: (os::getenv("HADOOP_HOME").isSome()
? path::join(os::getenv("HADOOP_HOME").get(), "bin/hadoop")
: "hadoop")) {}
// Look for `hadoop' in HADOOP_HOME or assume it's on the PATH.
HDFS()
: hadoop(os::getenv("HADOOP_HOME").isSome()
? path::join(os::getenv("HADOOP_HOME").get(), "bin/hadoop")
: "hadoop") {}
// Check if hadoop client is available at the path that was set.
// This can be done by executing `hadoop version` command and
// checking for status code == 0.
Try<bool> available()
{
Try<std::string> command = strings::format("%s version", hadoop);
CHECK_SOME(command);
// We are piping stderr to stdout so that we can see the error (if
// any) in the logs emitted by `os::shell()` in case of failure.
Try<std::string> out = os::shell(command.get() + " 2>&1");
if (out.isError()) {
return Error(out.error());
}
return true;
}
Try<bool> exists(std::string path)
{
// Make sure 'path' starts with a '/'.
path = path::join("", path);
Try<std::string> command = strings::format(
"%s fs -test -e '%s'", hadoop, path);
CHECK_SOME(command);
// We are piping stderr to stdout so that we can see the error (if
// any) in the logs emitted by `os::shell()` in case of failure.
Try<std::string> out = os::shell(command.get() + " 2>&1");
if (out.isError()) {
return Error(out.error());
}
return true;
}
Try<Bytes> du(std::string path)
{
// Make sure 'path' starts with a '/'.
path = path::join("", path);
Try<std::string> command = strings::format(
"%s fs -du -h '%s'", hadoop, path);
CHECK_SOME(command);
// We are piping stderr to stdout so that we can see the error (if
// any) in the logs emitted by `os::shell()` in case of failure.
//
// TODO(marco): this was the existing logic, but not sure it is
// actually needed.
Try<std::string> out = os::shell(command.get() + " 2>&1");
if (out.isError()) {
return Error("HDFS du failed: " + out.error());
}
const std::vector<std::string>& s = strings::split(out.get(), " ");
if (s.size() != 2) {
return Error("HDFS du returned an unexpected number of results: '" +
out.get() + "'");
}
Result<size_t> size = numify<size_t>(s[0]);
if (size.isError()) {
return Error("HDFS du returned unexpected format: " + size.error());
} else if (size.isNone()) {
return Error("HDFS du returned unexpected format");
}
return Bytes(size.get());
}
Try<Nothing> rm(std::string path)
{
// Make sure 'to' starts with a '/'.
path = path::join("", path);
Try<std::string> command = strings::format(
"%s fs -rm '%s'", hadoop, path);
CHECK_SOME(command);
Try<std::string> out = os::shell(command.get());
if (out.isError()) {
return Error(out.error());
}
return Nothing();
}
Try<Nothing> copyFromLocal(
const std::string& from,
std::string to)
{
if (!os::exists(from)) {
return Error("Failed to find " + from);
}
// Make sure 'to' starts with a '/'.
to = path::join("", to);
// Copy to HDFS.
Try<std::string> command = strings::format(
"%s fs -copyFromLocal '%s' '%s'", hadoop, from, to);
CHECK_SOME(command);
Try<std::string> out = os::shell(command.get());
if (out.isError()) {
return Error(out.error());
}
return Nothing();
}
Try<Nothing> copyToLocal(
const std::string& from,
const std::string& to)
{
// Copy from HDFS.
Try<std::string> command = strings::format(
"%s fs -copyToLocal '%s' '%s'", hadoop, from, to);
CHECK_SOME(command);
Try<std::string> out = os::shell(command.get());
if (out.isError()) {
return Error(out.error());
}
return Nothing();
}
private:
const std::string hadoop;
};
#endif // __HDFS_HPP__