blob: 3947b69c965cfb7289f563202288ea6e4979d942 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <string>
#include <tuple>
#include <vector>
#include <process/collect.hpp>
#include <process/io.hpp>
#include <process/subprocess.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/os/constants.hpp>
#include <stout/os/exists.hpp>
#include <stout/os/shell.hpp>
#include "common/status_utils.hpp"
#include "hdfs/hdfs.hpp"
#include "uri/schemes/hdfs.hpp"
using namespace process;
using std::string;
using std::vector;
struct CommandResult
{
Option<int> status;
string out;
string err;
};
static Future<CommandResult> result(const Subprocess& s)
{
CHECK_SOME(s.out());
CHECK_SOME(s.err());
return await(
s.status(),
io::read(s.out().get()),
io::read(s.err().get()))
.then([](const std::tuple<
Future<Option<int>>,
Future<string>,
Future<string>>& t) -> Future<CommandResult> {
const Future<Option<int>>& status = std::get<0>(t);
if (!status.isReady()) {
return Failure(
"Failed to get the exit status of the subprocess: " +
(status.isFailed() ? status.failure() : "discarded"));
}
const Future<string>& output = std::get<1>(t);
if (!output.isReady()) {
return Failure(
"Failed to read stdout from the subprocess: " +
(output.isFailed() ? output.failure() : "discarded"));
}
const Future<string>& error = std::get<2>(t);
if (!error.isReady()) {
return Failure(
"Failed to read stderr from the subprocess: " +
(error.isFailed() ? error.failure() : "discarded"));
}
CommandResult result;
result.status = status.get();
result.out = output.get();
result.err = error.get();
return result;
});
}
Try<Owned<HDFS>> HDFS::create(const Option<string>& _hadoop)
{
// Determine the hadoop client to use. If the user has specified
// it, use it. If not, look for environment variable HADOOP_HOME. If
// the environment variable is not set, assume it's on the PATH.
string hadoop;
if (_hadoop.isSome()) {
hadoop = _hadoop.get();
} else {
Option<string> hadoopHome = os::getenv("HADOOP_HOME");
if (hadoopHome.isSome()) {
hadoop = path::join(hadoopHome.get(), "bin", "hadoop");
} else {
hadoop = "hadoop";
}
}
// Check if the hadoop client is available.
Try<Subprocess> subprocess = process::subprocess(hadoop + " version 2>&1");
if (subprocess.isError()) {
return Error("Failed to exec hadoop subprocess: " + subprocess.error());
}
Option<int> status = subprocess->status().get();
if (status.isNone()) {
return Error("No status found for 'hadoop version' command");
}
// Check the final status of the command
if (status.get() != 0) {
return Error(
"Hadoop client is not available, exit status: " +
stringify(status.get()));
}
return Owned<HDFS>(new HDFS(hadoop));
}
Try<mesos::URI> HDFS::parse(const string& uri)
{
size_t schemePos = uri.find("://");
if (schemePos == string::npos) {
return Error("Missing scheme in url string");
}
const string uriPath = uri.substr(schemePos + 3);
size_t pathPos = uriPath.find_first_of('/');
if (pathPos == 0) {
return mesos::uri::hdfs(uriPath);
}
// If path is specified in the URL, try to capture the host and path
// separately.
string host = uriPath;
string path = "/";
if (pathPos != string::npos) {
host = host.substr(0, pathPos);
path = uriPath.substr(pathPos);
}
if (host.empty()) {
return mesos::uri::hdfs(path);
}
const vector<string> tokens = strings::tokenize(host, ":");
if (tokens[0].empty()) {
return Error("Host not found in url");
}
if (tokens.size() > 2) {
return Error("Found multiple ports in url");
}
Option<int> port;
if (tokens.size() == 2) {
Try<int> numifyPort = numify<int>(tokens[1]);
if (numifyPort.isError()) {
return Error("Failed to parse port: " + numifyPort.error());
}
port = numifyPort.get();
} else {
// Default port for HDFS.
port = 8020;
}
return mesos::uri::hdfs(path, tokens[0], port.get());
}
// An HDFS client path must be either a full URI or an absolute path. If it is
// a relative path, prepend "/" to make it absolute. (Note that all URI schemes
// supported by the HDFS client contain "://" whereas file paths never do.)
static string normalize(const string& hdfsPath)
{
if (strings::contains(hdfsPath, "://") || // A URI or a malformed path.
path::absolute(hdfsPath)) { // Already an absolute path.
return hdfsPath;
}
// A relative, non-URI file path. Prepend "/".
return path::join("", hdfsPath);
}
Future<bool> HDFS::exists(const string& path)
{
Try<Subprocess> s = subprocess(
hadoop,
{"hadoop", "fs", "-test", "-e", normalize(path)},
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure("Failed to execute the subprocess: " + s.error());
}
return result(s.get())
.then([](const CommandResult& result) -> Future<bool> {
if (result.status.isNone()) {
return Failure("Failed to reap the subprocess");
}
if (WSUCCEEDED(result.status.get())) {
return true;
}
if (WIFEXITED(result.status.get()) &&
WEXITSTATUS(result.status.get()) == 1) {
return false;
}
return Failure(
"Unexpected result from the subprocess: "
"status='" + WSTRINGIFY(result.status.get()) + "', " +
"stdout='" + result.out + "', " +
"stderr='" + result.err + "'");
});
}
Future<Bytes> HDFS::du(const string& _path)
{
const string path = normalize(_path);
Try<Subprocess> s = subprocess(
hadoop,
{"hadoop", "fs", "-du", path},
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure("Failed to execute the subprocess: " + s.error());
}
return result(s.get())
.then([path](const CommandResult& result) -> Future<Bytes> {
if (result.status.isNone()) {
return Failure("Failed to reap the subprocess");
}
if (result.status.get() != 0) {
return Failure(
"Unexpected result from the subprocess: "
"status='" + stringify(result.status.get()) + "', " +
"stdout='" + result.out + "', " +
"stderr='" + result.err + "'");
}
// We expect 2 space-separated output fields; a number of bytes
// then the name of the path we gave. The 'hadoop' command can
// emit various WARN or other log messages, so we make an effort
// to scan for the field we want.
foreach (const string& line, strings::tokenize(result.out, "\n")) {
// Note that we use tokenize() rather than split() since
// fields can be delimited by multiple spaces.
vector<string> fields = strings::tokenize(line, " \t");
// There might be 2 or 3 fields, see HADOOP-6857. The 2-field
// version contains object size and path, the 3-field version
// contains object size, object storage size and path.
if ((fields.size() == 2 || fields.size() == 3) &&
fields.back() == path) {
Result<size_t> size = numify<size_t>(fields[0]);
if (size.isSome()) {
return Bytes(size.get());
}
}
}
return Failure("Unexpected output format: '" + result.out + "'");
});
}
Future<Nothing> HDFS::rm(const string& path)
{
Try<Subprocess> s = subprocess(
hadoop,
{"hadoop", "fs", "-rm", normalize(path)},
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure("Failed to execute the subprocess: " + s.error());
}
return result(s.get())
.then([](const CommandResult& result) -> Future<Nothing> {
if (result.status.isNone()) {
return Failure("Failed to reap the subprocess");
}
if (result.status.get() != 0) {
return Failure(
"Unexpected result from the subprocess: "
"status='" + stringify(result.status.get()) + "', " +
"stdout='" + result.out + "', " +
"stderr='" + result.err + "'");
}
return Nothing();
});
}
Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to)
{
if (!os::exists(from)) {
return Failure("Failed to find '" + from + "'");
}
Try<Subprocess> s = subprocess(
hadoop,
{"hadoop", "fs", "-copyFromLocal", from, normalize(to)},
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure("Failed to execute the subprocess: " + s.error());
}
return result(s.get())
.then([](const CommandResult& result) -> Future<Nothing> {
if (result.status.isNone()) {
return Failure("Failed to reap the subprocess");
}
if (result.status.get() != 0) {
return Failure(
"Unexpected result from the subprocess: "
"status='" + stringify(result.status.get()) + "', " +
"stdout='" + result.out + "', " +
"stderr='" + result.err + "'");
}
return Nothing();
});
}
Future<Nothing> HDFS::copyToLocal(const string& from, const string& to)
{
Try<Subprocess> s = subprocess(
hadoop,
{hadoop, "fs", "-copyToLocal", normalize(from), to},
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure("Failed to execute the subprocess: " + s.error());
}
return result(s.get())
.then([](const CommandResult& result) -> Future<Nothing> {
if (result.status.isNone()) {
return Failure("Failed to reap the subprocess");
}
if (result.status.get() != 0) {
return Failure(
"Unexpected result from the subprocess: "
"status='" + stringify(result.status.get()) + "', " +
"stdout='" + result.out + "', " +
"stderr='" + result.err + "'");
}
return Nothing();
});
}