blob: 990d5724ffcd9a945f4dee38d546900c2334df9f [file] [log] [blame]
package org.apache.hadoop.thriftfs;
import com.facebook.thrift.TException;
import com.facebook.thrift.TApplicationException;
import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.server.TServer;
import com.facebook.thrift.server.TThreadPoolServer;
import com.facebook.thrift.transport.TServerSocket;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransportFactory;
// Include Generated code
import org.apache.hadoop.thriftfs.api.*;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hdfs.HdfsConfiguration;
/**
* ThriftHadoopFileSystem
* A thrift wrapper around the Hadoop File System
*/
public class HadoopThriftServer extends ThriftHadoopFileSystem {
static int serverPort = 0; // default port
TServer server = null;
public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
{
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
// HDFS glue
Configuration conf;
FileSystem fs;
// stucture that maps each Thrift object into an hadoop object
private long nextId = new Random().nextLong();
private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
private Daemon inactivityThread = null;
// Detect inactive session
private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
private static volatile long inactivityRecheckInterval = 60 * 1000;
private static volatile boolean fsRunning = true;
private static long now;
// allow outsider to change the hadoopthrift path
public void setOption(String key, String val) {
}
/**
* Current system time.
* @return current time in msec.
*/
static long now() {
return System.currentTimeMillis();
}
/**
* getVersion
*
* @return current version of the interface.
*/
public String getVersion() {
return "0.1";
}
/**
* shutdown
*
* cleanly closes everything and exit.
*/
public void shutdown(int status) {
LOG.info("HadoopThriftServer shutting down.");
try {
fs.close();
} catch (IOException e) {
LOG.warn("Unable to close file system");
}
Runtime.getRuntime().exit(status);
}
/**
* Periodically checks to see if there is inactivity
*/
class InactivityMonitor implements Runnable {
public void run() {
while (fsRunning) {
try {
if (now() > now + inactivityPeriod) {
LOG.warn("HadoopThriftServer Inactivity period of " +
inactivityPeriod + " expired... Stopping Server.");
shutdown(-1);
}
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
try {
Thread.sleep(inactivityRecheckInterval);
} catch (InterruptedException ie) {
}
}
}
}
/**
* HadoopThriftServer
*
* Constructor for the HadoopThriftServer glue with Thrift Class.
*
* @param name - the name of this handler
*/
public HadoopThriftHandler(String name) {
conf = new HdfsConfiguration();
now = now();
try {
inactivityThread = new Daemon(new InactivityMonitor());
fs = FileSystem.get(conf);
} catch (IOException e) {
LOG.warn("Unable to open hadoop file system...");
Runtime.getRuntime().exit(-1);
}
}
/**
* printStackTrace
*
* Helper function to print an exception stack trace to the log and not stderr
*
* @param e the exception
*
*/
static private void printStackTrace(Exception e) {
for(StackTraceElement s: e.getStackTrace()) {
LOG.error(s);
}
}
/**
* Lookup a thrift object into a hadoop object
*/
private synchronized Object lookup(long id) {
return hadoopHash.get(new Long(id));
}
/**
* Insert a thrift object into a hadoop object. Return its id.
*/
private synchronized long insert(Object o) {
nextId++;
hadoopHash.put(nextId, o);
return nextId;
}
/**
* Delete a thrift object from the hadoop store.
*/
private synchronized Object remove(long id) {
return hadoopHash.remove(new Long(id));
}
/**
* Implement the API exported by this thrift server
*/
/** Set inactivity timeout period. The period is specified in seconds.
* if there are no RPC calls to the HadoopThrift server for this much
* time, then the server kills itself.
*/
public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
inactivityPeriod = periodInSeconds * 1000; // in milli seconds
if (inactivityRecheckInterval > inactivityPeriod ) {
inactivityRecheckInterval = inactivityPeriod;
}
}
/**
* Create a file and open it for writing
*/
public ThriftHandle create(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("create: " + path);
FSDataOutputStream out = fs.create(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Create a file and open it for writing, delete file if it exists
*/
public ThriftHandle createFile(Pathname path,
short mode,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("create: " + path +
" permission: " + mode +
" overwrite: " + overwrite +
" bufferSize: " + bufferSize +
" replication: " + replication +
" blockSize: " + blockSize);
FSDataOutputStream out = fs.create(new Path(path.pathname),
new FsPermission(mode),
overwrite,
bufferSize,
replication,
blockSize,
null); // progress
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Opens an existing file and returns a handle to read it
*/
public ThriftHandle open(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("open: " + path);
FSDataInputStream out = fs.open(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Opens an existing file to append to it.
*/
public ThriftHandle append(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("append: " + path);
FSDataOutputStream out = fs.append(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* write to a file
*/
public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("write: " + tout.id);
FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
byte[] tmp = data.getBytes("UTF-8");
out.write(tmp, 0, tmp.length);
HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
return true;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* read from a file
*/
public String read(ThriftHandle tout, long offset,
int length) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("read: " + tout.id +
" offset: " + offset +
" length: " + length);
FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
if (in.getPos() != offset) {
in.seek(offset);
}
byte[] tmp = new byte[length];
int numbytes = in.read(offset, tmp, 0, length);
HadoopThriftHandler.LOG.debug("read done: " + tout.id);
return new String(tmp, 0, numbytes, "UTF-8");
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Delete a file/directory
*/
public boolean rm(Pathname path, boolean recursive)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("rm: " + path +
" recursive: " + recursive);
boolean ret = fs.delete(new Path(path.pathname), recursive);
HadoopThriftHandler.LOG.debug("rm: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Move a file/directory
*/
public boolean rename(Pathname path, Pathname dest)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("rename: " + path +
" destination: " + dest);
boolean ret = fs.rename(new Path(path.pathname),
new Path(dest.pathname));
HadoopThriftHandler.LOG.debug("rename: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* close file
*/
public boolean close(ThriftHandle tout) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("close: " + tout.id);
Object obj = remove(tout.id);
if (obj instanceof FSDataOutputStream) {
FSDataOutputStream out = (FSDataOutputStream)obj;
out.close();
} else if (obj instanceof FSDataInputStream) {
FSDataInputStream in = (FSDataInputStream)obj;
in.close();
} else {
throw new ThriftIOException("Unknown thrift handle.");
}
HadoopThriftHandler.LOG.debug("closed: " + tout.id);
return true;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Create a directory
*/
public boolean mkdirs(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("mkdirs: " + path);
boolean ret = fs.mkdirs(new Path(path.pathname));
HadoopThriftHandler.LOG.debug("mkdirs: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Does this pathname exist?
*/
public boolean exists(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("exists: " + path);
boolean ret = fs.exists(new Path(path.pathname));
HadoopThriftHandler.LOG.debug("exists done: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Returns status about the specified pathname
*/
public org.apache.hadoop.thriftfs.api.FileStatus stat(
Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("stat: " + path);
org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
new Path(path.pathname));
HadoopThriftHandler.LOG.debug("stat done: " + path);
return new org.apache.hadoop.thriftfs.api.FileStatus(
stat.getPath().toString(),
stat.getLen(),
stat.isDirectory(),
stat.getReplication(),
stat.getBlockSize(),
stat.getModificationTime(),
stat.getPermission().toString(),
stat.getOwner(),
stat.getGroup());
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* If the specified pathname is a directory, then return the
* list of pathnames in this directory
*/
public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("listStatus: " + path);
org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
new Path(path.pathname));
HadoopThriftHandler.LOG.debug("listStatus done: " + path);
org.apache.hadoop.thriftfs.api.FileStatus tmp;
List<org.apache.hadoop.thriftfs.api.FileStatus> value =
new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
for (int i = 0; i < stat.length; i++) {
tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
stat[i].getPath().toString(),
stat[i].getLen(),
stat[i].isDirectory(),
stat[i].getReplication(),
stat[i].getBlockSize(),
stat[i].getModificationTime(),
stat[i].getPermission().toString(),
stat[i].getOwner(),
stat[i].getGroup());
value.add(tmp);
}
return value;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Sets the permission of a pathname
*/
public void chmod(Pathname path, short mode) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("chmod: " + path +
" mode " + mode);
fs.setPermission(new Path(path.pathname), new FsPermission(mode));
HadoopThriftHandler.LOG.debug("chmod done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Sets the owner & group of a pathname
*/
public void chown(Pathname path, String owner, String group)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("chown: " + path +
" owner: " + owner +
" group: " + group);
fs.setOwner(new Path(path.pathname), owner, group);
HadoopThriftHandler.LOG.debug("chown done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Sets the replication factor of a file
*/
public void setReplication(Pathname path, short repl) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("setrepl: " + path +
" replication factor: " + repl);
fs.setReplication(new Path(path.pathname), repl);
HadoopThriftHandler.LOG.debug("setrepl done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
/**
* Returns the block locations of this file
*/
public List<org.apache.hadoop.thriftfs.api.BlockLocation>
getFileBlockLocations(Pathname path, long start, long length)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
new Path(path.pathname));
org.apache.hadoop.fs.BlockLocation[] stat =
fs.getFileBlockLocations(status, start, length);
HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
org.apache.hadoop.thriftfs.api.BlockLocation tmp;
List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
for (int i = 0; i < stat.length; i++) {
// construct the list of hostnames from the array returned
// by HDFS
List<String> hosts = new LinkedList<String>();
String[] hostsHdfs = stat[i].getHosts();
for (int j = 0; j < hostsHdfs.length; j++) {
hosts.add(hostsHdfs[j]);
}
// construct the list of host:port from the array returned
// by HDFS
List<String> names = new LinkedList<String>();
String[] namesHdfs = stat[i].getNames();
for (int j = 0; j < namesHdfs.length; j++) {
names.add(namesHdfs[j]);
}
tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
hosts, names, stat[i].getOffset(), stat[i].getLength());
value.add(tmp);
}
return value;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
}
// Bind to port. If the specified port is 0, then bind to random port.
private ServerSocket createServerSocket(int port) throws IOException {
try {
ServerSocket sock = new ServerSocket();
// Prevent 2MSL delay problem on server restarts
sock.setReuseAddress(true);
// Bind to listening port
if (port == 0) {
sock.bind(null);
serverPort = sock.getLocalPort();
} else {
sock.bind(new InetSocketAddress(port));
}
return sock;
} catch (IOException ioe) {
throw new IOException("Could not create ServerSocket on port " + port + "." +
ioe);
}
}
/**
* Constrcts a server object
*/
public HadoopThriftServer(String [] args) {
if (args.length > 0) {
serverPort = new Integer(args[0]);
}
try {
ServerSocket ssock = createServerSocket(serverPort);
TServerTransport serverTransport = new TServerSocket(ssock);
Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
TThreadPoolServer.Options options = new TThreadPoolServer.Options();
options.minWorkerThreads = 10;
server = new TThreadPoolServer(processor, serverTransport,
new TTransportFactory(),
new TTransportFactory(),
new TBinaryProtocol.Factory(),
new TBinaryProtocol.Factory(),
options);
System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
System.out.flush();
} catch (Exception x) {
x.printStackTrace();
}
}
public static void main(String [] args) {
HadoopThriftServer me = new HadoopThriftServer(args);
me.server.serve();
}
};