blob: f9f1731b0e1b888839a7eec3dd989df88b1bb07d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.shell.commands;
import java.io.Console;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import org.apache.knox.gateway.shell.CredentialCollectionException;
import org.apache.knox.gateway.shell.CredentialCollector;
import org.apache.knox.gateway.shell.KnoxSession;
import org.apache.knox.gateway.shell.KnoxShellException;
import org.apache.knox.gateway.shell.hdfs.Hdfs;
import org.apache.knox.gateway.shell.hdfs.Status.Response;
import org.apache.knox.gateway.shell.table.KnoxShellTable;
import org.apache.knox.gateway.util.JsonUtils;
import org.codehaus.groovy.tools.shell.Groovysh;
public class WebHDFSCommand extends AbstractKnoxShellCommand {
private static final String DESC = "POSIX style commands for Hadoop Filesystems";
private static final String USAGE = "Usage: \n" +
" :fs mounts \n" +
" :fs mount target-topology-url mountpoint-name \n" +
" :fs unmount mountpoint-name \n" +
" :fs ls {target-path} \n" +
" :fs cat {target-path} \n" +
" :fs get {from-path} {to-path} \n" +
" :fs put {from-path} {tp-path} \n" +
" :fs rm {target-path} \n" +
" :fs mkdir {dir-path} \n";
private Map<String, KnoxSession> sessions = new HashMap<>();
public WebHDFSCommand(Groovysh shell) {
super(shell, ":filesystem", ":fs", DESC, USAGE, DESC);
}
@Override
public Object execute(List<String> args) {
Map<String, String> mounts = getMountPoints();
if (args.isEmpty()) {
args.add("ls");
}
if (args.get(0).equalsIgnoreCase("mount")) {
String url = args.get(1);
String mountPoint = args.get(2);
return mount(mounts, url, mountPoint);
}
else if (args.get(0).equalsIgnoreCase("unmount")) {
String mountPoint = args.get(1);
unmount(mounts, mountPoint);
}
else if (args.get(0).equalsIgnoreCase("mounts")) {
return listMounts(mounts);
}
else if (args.get(0).equalsIgnoreCase("ls")) {
String path = args.get(1);
return listStatus(mounts, path);
}
else if (args.get(0).equalsIgnoreCase("put")) {
// Hdfs.put( session ).file( dataFile ).to( dataDir + "/" + dataFile ).now()
// :fs put from-path to-path
String localFile = args.get(1);
String path = args.get(2);
int permission = 755;
if (args.size() >= 4) {
permission = Integer.parseInt(args.get(3));
}
return put(mounts, localFile, path, permission);
}
else if (args.get(0).equalsIgnoreCase("rm")) {
// Hdfs.rm( session ).file( dataFile ).now()
// :fs rm target-path
String path = args.get(1);
return remove(mounts, path);
}
else if (args.get(0).equalsIgnoreCase("cat")) {
// println Hdfs.get( session ).from( dataDir + "/" + dataFile ).now().string
// :fs cat target-path
String path = args.get(1);
return cat(mounts, path);
}
else if (args.get(0).equalsIgnoreCase("mkdir")) {
// println Hdfs.mkdir( session ).dir( directoryPath ).perm( "777" ).now().string
// :fs mkdir target-path [perms]
String path = args.get(1);
String perms = null;
if (args.size() == 3) {
perms = args.get(2);
}
return mkdir(mounts, path, perms);
}
else if (args.get(0).equalsIgnoreCase("get")) {
// println Hdfs.get( session ).from( dataDir + "/" + dataFile ).now().string
// :fs get from-path [to-path]
String path = args.get(1);
String mountPoint = determineMountPoint(path);
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
String from = determineTargetPath(path, mountPoint);
String to = null;
if (args.size() > 2) {
to = args.get(2);
}
else {
to = System.getProperty("user.home") + File.separator +
path.substring(path.lastIndexOf(File.separator));
}
return get(mountPoint, from, to);
}
else {
return "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
}
else {
System.out.println("Unknown filesystem command");
System.out.println(getUsage());
}
return "";
}
private String get(String mountPoint, String from, String to) {
String result = null;
try {
Hdfs.get(sessions.get(mountPoint)).from(from).file(to).now().getString();
result = "Successfully copied: " + from + " to: " + to;
} catch (KnoxShellException | IOException e) {
e.printStackTrace();
result = "Exception ocurred: " + e.getMessage();
}
return result;
}
private String mkdir(Map<String, String> mounts, String path, String perms) {
String result = null;
String mountPoint = determineMountPoint(path);
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
String targetPath = determineTargetPath(path, mountPoint);
if (!exists(session, targetPath)) {
try {
if (perms != null) {
Hdfs.mkdir(sessions.get(mountPoint)).dir(targetPath).now().getString();
}
else {
Hdfs.mkdir(session).dir(targetPath).perm(perms).now().getString();
}
result = "Successfully created directory: " + targetPath;
} catch (KnoxShellException | IOException e) {
e.printStackTrace();
result = "Exception ocurred: " + e.getMessage();
}
}
else {
result = targetPath + " already exists";
}
}
else {
result = "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
return result;
}
private String cat(Map<String, String> mounts, String path) {
String response = null;
String mountPoint = determineMountPoint(path);
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
String targetPath = determineTargetPath(path, mountPoint);
try {
String contents = Hdfs.get(session).from(targetPath).now().getString();
response = contents;
} catch (KnoxShellException | IOException e) {
e.printStackTrace();
response = "Exception ocurred: " + e.getMessage();
}
}
else {
response = "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
return response;
}
private String remove(Map<String, String> mounts, String path) {
String mountPoint = determineMountPoint(path);
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
String targetPath = determineTargetPath(path, mountPoint);
try {
Hdfs.rm(session).file(targetPath).now().getString();
} catch (KnoxShellException | IOException e) {
e.printStackTrace();
}
}
else {
return "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
return "Successfully removed: " + path;
}
private String put(Map<String, String> mounts, String localFile, String path, int permission) {
String mountPoint = determineMountPoint(path);
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
String targetPath = determineTargetPath(path, mountPoint);
try {
boolean overwrite = false;
if (exists(session, targetPath)) {
if (collectClearInput(targetPath + " already exists would you like to overwrite (Y/n)").equalsIgnoreCase("y")) {
overwrite = true;
}
}
Hdfs.put(session).file(localFile).to(targetPath).overwrite(overwrite).permission(permission).now().getString();
} catch (IOException e) {
e.printStackTrace();
return "Exception ocurred: " + e.getMessage();
}
}
else {
return "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
return "Successfully put: " + localFile + " to: " + path;
}
private boolean exists(KnoxSession session, String path) {
boolean rc = false;
try {
Response response = Hdfs.status(session).file(path).now();
rc = response.exists();
} catch (KnoxShellException e) {
// NOP
}
return rc;
}
private Object listStatus(Map<String, String> mounts, String path) {
Object response = null;
try {
String directory;
String mountPoint = determineMountPoint(path);
if (mountPoint != null) {
KnoxSession session = getSessionForMountPoint(mounts, mountPoint);
if (session != null) {
directory = determineTargetPath(path, mountPoint);
String json = Hdfs.ls(session).dir(directory).now().getString();
Map<String,HashMap<String, ArrayList<HashMap<String, String>>>> map =
JsonUtils.getFileStatusesAsMap(json);
if (map != null) {
ArrayList<HashMap<String, String>> list = map.get("FileStatuses").get("FileStatus");
KnoxShellTable table = buildTableFromListStatus(directory, list);
response = table;
}
}
else {
response = "No session established for mountPoint: " + mountPoint + " Use :fs mount {topology-url} {mountpoint-name}";
}
}
else {
response = "No mountpoint found. Use ':fs mount {topologyURL} {mountpoint}'.";
}
} catch (KnoxShellException | IOException e) {
response = "Exception ocurred: " + e.getMessage();
e.printStackTrace();
}
return response;
}
private KnoxShellTable listMounts(Map<String, String> mounts) {
KnoxShellTable table = new KnoxShellTable();
table.header("Mount Point").header("Topology URL");
for (String mountPoint : mounts.keySet()) {
table.row().value(mountPoint).value(mounts.get(mountPoint));
}
return table;
}
private void unmount(Map<String, String> mounts, String mountPoint) {
sessions.remove(mountPoint);
mounts.remove(mountPoint);
KnoxSession.persistMountPoints(mounts);
}
private String mount(Map<String, String> mounts, String url, String mountPoint) {
KnoxSession session = establishSession(mountPoint, url);
if (session != null) {
mounts.put(mountPoint, url);
KnoxSession.persistMountPoints(mounts);
return url + " mounted as " + mountPoint;
}
return "Failed to mount " + url + " as " + mountPoint;
}
private KnoxSession getSessionForMountPoint(Map<String, String> mounts, String mountPoint) {
KnoxSession session = sessions.get(mountPoint);
if (session == null) {
String url = mounts.get(mountPoint);
if (url != null) {
session = establishSession(mountPoint, url);
}
}
return session;
}
private KnoxSession establishSession(String mountPoint, String url) {
CredentialCollector dlg;
try {
dlg = login();
} catch (CredentialCollectionException e) {
e.printStackTrace();
return null;
}
String username = dlg.name();
String password = new String(dlg.chars());
KnoxSession session = null;
try {
session = KnoxSession.login(url, username, password);
sessions.put(mountPoint, session);
} catch (URISyntaxException e) {
e.printStackTrace();
}
return session;
}
private String collectClearInput(String prompt) {
Console c = System.console();
if (c == null) {
System.err.println("No console.");
System.exit(1);
}
String value = c.readLine(prompt);
return value;
}
private String determineTargetPath(String path, String mountPoint) {
String directory = null;
if (path.startsWith("/")) {
directory = stripMountPoint(path, mountPoint);
}
return directory;
}
private String stripMountPoint(String path, String mountPoint) {
String newPath = path.replace("/" + mountPoint, "");
return newPath;
}
private String determineMountPoint(String path) {
String mountPoint = null;
if (path.startsWith("/")) {
// does the user supplied path starts at a root
// if so check for a mountPoint based on the first element of the path
String[] pathElements = path.split("/");
mountPoint = pathElements[1];
}
return mountPoint;
}
private KnoxShellTable buildTableFromListStatus(String directory, List<HashMap<String, String>> list) {
Calendar cal = Calendar.getInstance(TimeZone.getDefault(), Locale.getDefault());
KnoxShellTable table = new KnoxShellTable();
table.title(directory);
table.header("permission")
.header("owner")
.header("group")
.header("length")
.header("modtime")
.header("name");
for (Map<String, String> map : list) {
cal.setTimeInMillis(Long.parseLong(map.get("modificationTime")));
table.row()
.value(map.get("permission"))
.value(map.get("owner"))
.value(map.get("group"))
.value(map.get("length"))
.value(cal.getTime())
.value(map.get("pathSuffix"));
}
return table;
}
protected Map<String, String> getMountPoints() {
Map<String, String> mounts = null;
try {
mounts = KnoxSession.loadMountPoints();
} catch (IOException e) {
e.printStackTrace();
}
return mounts;
}
public static void main(String[] args) {
WebHDFSCommand cmd = new WebHDFSCommand(new Groovysh());
cmd.execute(new ArrayList<>(Arrays.asList(args)));
}
}