| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <string> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <stout/net.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/strings.hpp> |
| |
| #include "hdfs/hdfs.hpp" |
| |
| using namespace mesos; |
| |
| using std::string; |
| |
| const char FILE_URI_PREFIX[] = "file://"; |
| const char FILE_URI_LOCALHOST[] = "file://localhost"; |
| |
| // Try to extract filename into directory. If filename is recognized as an |
| // archive it will be extracted and true returned; if not recognized then false |
| // will be returned. An Error is returned if the extraction command fails. |
| Try<bool> extract(const string& filename, const string& directory) |
| { |
| string command; |
| // Extract any .tgz, tar.gz, tar.bz2 or zip files. |
| if (strings::endsWith(filename, ".tgz") || |
| strings::endsWith(filename, ".tar.gz") || |
| strings::endsWith(filename, ".tbz2") || |
| strings::endsWith(filename, ".tar.bz2") || |
| strings::endsWith(filename, ".txz") || |
| strings::endsWith(filename, ".tar.xz")) { |
| command = "tar -C '" + directory + "' -xf"; |
| } else if (strings::endsWith(filename, ".zip")) { |
| command = "unzip -d '" + directory + "'"; |
| } else { |
| return false; |
| } |
| |
| command += " '" + filename + "'"; |
| int status = os::system(command); |
| if (status != 0) { |
| return Error("Failed to extract: command " + command + |
| " exited with status: " + stringify(status)); |
| } |
| |
| LOG(INFO) << "Extracted resource '" << filename |
| << "' into '" << directory << "'"; |
| |
| return true; |
| } |
| |
| |
| // Fetch URI into directory. |
| Try<string> fetch( |
| const string& uri, |
| const string& directory) |
| { |
| LOG(INFO) << "Fetching URI '" << uri << "'"; |
| |
| // Some checks to make sure using the URI value in shell commands |
| // is safe. TODO(benh): These should be pushed into the scheduler |
| // driver and reported to the user. |
| if (uri.find_first_of('\\') != string::npos || |
| uri.find_first_of('\'') != string::npos || |
| uri.find_first_of('\0') != string::npos) { |
| LOG(ERROR) << "URI contains illegal characters, refusing to fetch"; |
| return Error("Illegal characters in URI"); |
| } |
| |
| // Grab the resource using the hadoop client if it's one of the known schemes |
| // TODO(tarnfeld): This isn't very scalable with hadoop's pluggable |
| // filesystem implementations. |
| // TODO(matei): Enforce some size limits on files we get from HDFS |
| if (strings::startsWith(uri, "hdfs://") || |
| strings::startsWith(uri, "hftp://") || |
| strings::startsWith(uri, "s3://") || |
| strings::startsWith(uri, "s3n://")) { |
| Try<string> base = os::basename(uri); |
| if (base.isError()) { |
| LOG(ERROR) << "Invalid basename for URI: " << base.error(); |
| return Error("Invalid basename for URI"); |
| } |
| string path = path::join(directory, base.get()); |
| |
| HDFS hdfs; |
| |
| LOG(INFO) << "Downloading resource from '" << uri |
| << "' to '" << path << "'"; |
| Try<Nothing> result = hdfs.copyToLocal(uri, path); |
| if (result.isError()) { |
| LOG(ERROR) << "HDFS copyToLocal failed: " << result.error(); |
| return Error(result.error()); |
| } |
| |
| return path; |
| } else if (strings::startsWith(uri, "http://") || |
| strings::startsWith(uri, "https://") || |
| strings::startsWith(uri, "ftp://") || |
| strings::startsWith(uri, "ftps://")) { |
| string path = uri.substr(uri.find("://") + 3); |
| if (path.find("/") == string::npos || |
| path.size() <= path.find("/") + 1) { |
| LOG(ERROR) << "Malformed URL (missing path)"; |
| return Error("Malformed URI"); |
| } |
| |
| path = path::join(directory, path.substr(path.find_last_of("/") + 1)); |
| LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'"; |
| Try<int> code = net::download(uri, path); |
| if (code.isError()) { |
| LOG(ERROR) << "Error downloading resource: " << code.error().c_str(); |
| return Error("Fetch of URI failed (" + code.error() + ")"); |
| } else if (code.get() != 200) { |
| LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code " |
| << code.get(); |
| return Error("HTTP/FTP error (" + stringify(code.get()) + ")"); |
| } |
| |
| return path; |
| } else { // Copy the local resource. |
| string local = uri; |
| bool fileUri = false; |
| if (strings::startsWith(local, string(FILE_URI_LOCALHOST))) { |
| local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1); |
| fileUri = true; |
| } else if (strings::startsWith(local, string(FILE_URI_PREFIX))) { |
| local = local.substr(sizeof(FILE_URI_PREFIX) - 1); |
| fileUri = true; |
| } |
| |
| if(fileUri && !strings::startsWith(local, "/")) { |
| return Error("File URI only supports absolute paths"); |
| } |
| |
| if (local.find_first_of("/") != 0) { |
| // We got a non-Hadoop and non-absolute path. |
| if (os::hasenv("MESOS_FRAMEWORKS_HOME")) { |
| local = path::join(os::getenv("MESOS_FRAMEWORKS_HOME"), local); |
| LOG(INFO) << "Prepended environment variable " |
| << "MESOS_FRAMEWORKS_HOME to relative path, " |
| << "making it: '" << local << "'"; |
| } else { |
| LOG(ERROR) << "A relative path was passed for the resource but the " |
| << "environment variable MESOS_FRAMEWORKS_HOME is not set. " |
| << "Please either specify this config option " |
| << "or avoid using a relative path"; |
| return Error("Could not resolve relative URI"); |
| } |
| } |
| |
| Try<string> base = os::basename(local); |
| if (base.isError()) { |
| LOG(ERROR) << base.error(); |
| return Error("Fetch of URI failed"); |
| } |
| |
| // Copy the resource to the directory. |
| string path = path::join(directory, base.get()); |
| std::ostringstream command; |
| command << "cp '" << local << "' '" << path << "'"; |
| LOG(INFO) << "Copying resource from '" << local |
| << "' to '" << directory << "'"; |
| |
| int status = os::system(command.str()); |
| if (status != 0) { |
| LOG(ERROR) << "Failed to copy '" << local |
| << "' : Exit status " << status; |
| return Error("Local copy failed"); |
| } |
| |
| return path; |
| } |
| } |
| |
| |
| int main(int argc, char* argv[]) |
| { |
| GOOGLE_PROTOBUF_VERIFY_VERSION; |
| |
| CommandInfo commandInfo; |
| // Construct URIs from the encoded environment string. |
| const std::string& uris = os::getenv("MESOS_EXECUTOR_URIS"); |
| foreach (const std::string& token, strings::tokenize(uris, " ")) { |
| // Delimiter between URI, execute permission and extract options |
| // Expected format: {URI}+[01][XN] |
| // {URI} - The actual URI for the asset to fetch |
| // [01] - 1 if the execute permission should be set else 0 |
| // [XN] - X if we should extract the URI (if it's compressed) else N |
| size_t pos = token.rfind("+"); |
| CHECK(pos != std::string::npos) |
| << "Invalid executor uri token in env " << token; |
| |
| CommandInfo::URI uri; |
| uri.set_value(token.substr(0, pos)); |
| uri.set_executable(token.substr(pos + 1, 1) == "1"); |
| uri.set_extract(token.substr(pos + 2, 1) == "X"); |
| |
| commandInfo.add_uris()->MergeFrom(uri); |
| } |
| |
| CHECK(os::hasenv("MESOS_WORK_DIRECTORY")) |
| << "Missing MESOS_WORK_DIRECTORY environment variable"; |
| std::string directory = os::getenv("MESOS_WORK_DIRECTORY"); |
| |
| // We cannot use Some in the ternary expression because the compiler needs to |
| // be able to infer the type, thus the explicit Option<string>. |
| // TODO(idownes): Add an os::hasenv that returns an Option<string>. |
| Option<std::string> user = os::hasenv("MESOS_USER") |
| ? Option<std::string>(os::getenv("MESOS_USER")) // Explicit so it compiles. |
| : None(); |
| |
| // Fetch each URI to a local file, chmod, then chown if a user is provided. |
| foreach (const CommandInfo::URI& uri, commandInfo.uris()) { |
| // Fetch the URI to a local file. |
| Try<string> fetched = fetch(uri.value(), directory); |
| if (fetched.isError()) { |
| EXIT(1) << "Failed to fetch: " << uri.value(); |
| } |
| |
| // Chmod the fetched URI if it's executable, else assume it's an archive |
| // that should be extracted. |
| if (uri.executable()) { |
| Try<Nothing> chmod = os::chmod( |
| fetched.get(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); |
| if (chmod.isError()) { |
| EXIT(1) << "Failed to chmod " << fetched.get() << ": " << chmod.error(); |
| } |
| } else if (uri.extract()) { |
| //TODO(idownes): Consider removing the archive once extracted. |
| // Try to extract the file if it's recognized as an archive. |
| Try<bool> extracted = extract(fetched.get(), directory); |
| if (extracted.isError()) { |
| EXIT(1) << "Failed to extract " |
| << fetched.get() << ":" << extracted.error(); |
| } |
| } else { |
| LOG(INFO) << "Skipped extracting path '" << fetched.get() << "'"; |
| } |
| |
| // Recursively chown the directory if a user is provided. |
| if (user.isSome()) { |
| Try<Nothing> chowned = os::chown(user.get(), directory); |
| if (chowned.isError()) { |
| EXIT(1) << "Failed to chown " << directory << ": " << chowned.error(); |
| } |
| } |
| } |
| |
| return 0; |
| } |