package org.apache.samza.util.hadoop
import org.apache.commons.httpclient.methods.GetMethod
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.HttpStatus
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Progressable
import org.apache.samza.util.Logging
class HttpFileSystem extends FileSystem with Logging {
val DEFAULT_BLOCK_SIZE = 4 * 1024;
var uri: URI = null
override def initialize(uri: URI, conf: Configuration) {
super.initialize(uri, conf)
debug("init uri %s" format (uri))
this.uri = uri
override def getUri = uri
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
debug("open http file %s" format (f))
val client = new HttpClient
val method = new GetMethod(f.toUri.toString)
val statusCode = client.executeMethod(method)
if (statusCode != HttpStatus.SC_OK) {
warn("got status code %d for uri %s" format (statusCode, uri))
throw new IOException("Bad status code returned by http server " + f + ": " + statusCode)
new FSDataInputStream(new HttpInputStream(method.getResponseBodyAsStream))
override def create(f: Path,
permission: FsPermission,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream = null
override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream = null
override def rename(src: Path, dst: Path): Boolean = false
override def delete(f: Path, recursive: Boolean): Boolean = false
override def listStatus(f: Path): Array[FileStatus] = null
override def setWorkingDirectory(newDir: Path) {}
override def getWorkingDirectory(): Path = new Path("/")
override def mkdirs(f: Path, permission: FsPermission): Boolean = false
override def getFileStatus(f: Path): FileStatus = {
val length = -1
val isDir = false
val blockReplication = 1
val blockSize = DEFAULT_BLOCK_SIZE
val modTime = 0
val fs = new FileStatus(length, isDir, blockReplication, blockSize, modTime, f)
debug("file status for %s is %s" format (f, fs))
return fs