blob: 2fa5b02ac5467a2c0c58cd024a2c8b48c4cea2f0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/os/mkdir.hpp>
#include "uri/fetchers/hadoop.hpp"
using std::vector;
using std::set;
using std::string;
using process::Failure;
using process::Future;
using process::Owned;
namespace mesos {
namespace uri {
HadoopFetcherPlugin::Flags::Flags()
{
add(&Flags::hadoop_client,
"hadoop_client",
"The path to the hadoop client\n");
add(&Flags::hadoop_client_supported_schemes,
"hadoop_client_supported_schemes",
"A comma-separated list of the schemes supported by the hadoop client.\n",
"hdfs,hftp,s3,s3n");
}
const char HadoopFetcherPlugin::NAME[] = "hadoop";
Try<Owned<Fetcher::Plugin>> HadoopFetcherPlugin::create(const Flags& flags)
{
Try<Owned<HDFS>> hdfs = HDFS::create(flags.hadoop_client);
if (hdfs.isError()) {
return Error("Failed to create HDFS client: " + hdfs.error());
}
vector<string> schemes = strings::tokenize(
flags.hadoop_client_supported_schemes, ",");
return Owned<Fetcher::Plugin>(new HadoopFetcherPlugin(
hdfs.get(),
set<string>(schemes.begin(), schemes.end())));
}
set<string> HadoopFetcherPlugin::schemes() const
{
return schemes_;
}
string HadoopFetcherPlugin::name() const
{
return NAME;
}
Future<Nothing> HadoopFetcherPlugin::fetch(
const URI& uri,
const string& directory,
const Option<string>& data) const
{
// TODO(jieyu): Validate the given URI.
if (!uri.has_path()) {
return Failure("URI path is not specified");
}
Try<Nothing> mkdir = os::mkdir(directory);
if (mkdir.isError()) {
return Failure(
"Failed to create directory '" +
directory + "': " + mkdir.error());
}
// NOTE: We ignore the scheme prefix if the host in URI is not
// specified. This is the case when the host is set using the hadoop
// configuration file.
//
// TODO(jieyu): Allow user to specify the name of the output file.
return hdfs.get()->copyToLocal(
(uri.has_host() ? stringify(uri) : uri.path()),
path::join(directory, Path(uri.path()).basename()));
}
} // namespace uri {
} // namespace mesos {