#!/usr/bin/env python | |
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
''' | |
gpfdist -- file distribution web server | |
Usage: gpfdist [-?v] [-p port] [-d dir] [-q qchar] [-x xchar] [-h] [-l logfile] | |
-? : print this help screen | |
-v : verbose mode | |
-p port : which port to serve HTTP. Default to 8080 | |
-d dir : serve files under the specified directory. | |
Default to '.' | |
-q qchar : set quote character. If not specified, the | |
program will not parse for CSV, and will | |
assume each record is separated by a newline | |
character that never occurs inside a record. | |
-x xchar : set escape character (default to qchar) | |
-h : first data row in data is a header row. skip | |
it (only allowed in CSV format). | |
-l : fully qualified path and name of log file | |
''' | |
import SocketServer, BaseHTTPServer, os, sys, getopt, threading, time, socket | |
MAX_CONCURRENT_SESSION = 64 | |
opt = {} | |
opt['-p'] = 8080 | |
opt['-v'] = False | |
opt['-V'] = False | |
opt['-h'] = False | |
opt['-d'] = '.' | |
qc = None | |
xc = None | |
def usage(exitarg): | |
print __doc__ | |
sys.exit(exitarg) | |
def parseInt(val): | |
try: return int(val) | |
except ValueError: return 0 | |
def parseCommandLine(): | |
global opt, qc, xc | |
try: | |
(options, args) = getopt.getopt(sys.argv[1:], '?Vvhp:d:q:x:') | |
except Exception, e: | |
usage('Error: ' + str(e)) | |
for (switch, val) in options: | |
if (switch == '-?'): usage(0) | |
elif (switch[1] in 'Vvh'): opt[switch] = True | |
elif (switch[1] in 'dqx'): opt[switch] = val | |
elif (switch[1] in 'p'): opt[switch] = parseInt(val) | |
elif (switch == '-q'): qc = val; opt['-f'] = False | |
elif (switch == '-x'): xc = val; opt['-f'] = False | |
if not opt['-p'] > 0: | |
usage('Error: please specify port number for -p switch') | |
if not os.path.isdir(opt['-d']): | |
usage('Error: please specify a directory for -d switch') | |
opt['-d'] = os.path.abspath(opt['-d']) | |
if (opt['-d'] == '/'): | |
usage('Security Error: cannot run under root (/) directory') | |
if '-q' in opt: qc = opt['-q']; opt['-f'] = False | |
if '-x' in opt: xc = opt['-x']; opt['-f'] = False | |
if qc and len(qc) != 1: | |
usage('Error: please specify a character for -q switch') | |
if xc and len(xc) != 1: | |
usage('Error: please specify a character for -x switch') | |
if not qc and xc: | |
usage('Error: you must specify -q qchar with -x xchar') | |
if not qc and opt['-h']: | |
usage('Error: header may only be used in CSV format. please specify -q switch') | |
if len(args) != 0: | |
usage(1) | |
if opt['-V']: opt['-v'] = true | |
# a File Session - shared among all GET request threads | |
class Session: | |
def __init__(self, fd, fname): | |
self.m_fname = fname | |
self.m_fd = fd | |
self.m_sem = threading.Semaphore(1) | |
self.m_residue = None | |
self.m_off = 0 | |
self.m_max = 0 | |
self.m_linecnt = 0 | |
self.m_threadcnt = 0 | |
def readLine(self): | |
line = '' | |
inQuote = False | |
lastWasEsc = False | |
self.m_sem.acquire() | |
start = self.m_off | |
try: | |
while True: | |
if not self.m_residue: | |
if not self.m_fd: break | |
self.m_residue = self.m_fd.read(1024*64) | |
if not self.m_residue: | |
self.m_fd.close() | |
self.m_fd = None | |
break | |
start = 0 | |
self.m_off = 0 | |
self.m_max = len(self.m_residue) | |
elif (self.m_off >= self.m_max): | |
line = line + self.m_residue[start:] | |
self.m_residue = None | |
continue | |
c = self.m_residue[self.m_off] | |
self.m_off = self.m_off + 1 | |
if c == '\n' and not inQuote: | |
line = line + self.m_residue[start:self.m_off] | |
break | |
if inQuote and c == xc: | |
lastWasEsc = not lastWasEsc | |
if c == qc and not lastWasEsc: | |
inQuote = not inQuote | |
if c != xc: | |
lastWasEsc = False | |
finally: | |
self.m_sem.release() | |
self.m_linecnt = self.m_linecnt + 1 | |
if self.m_linecnt % 10000 == 0: | |
print self.m_linecnt, 'lines' | |
return line | |
def readFile(self): | |
lines = [] | |
self.m_sem.acquire() | |
try: | |
if self.m_fd: | |
if self.m_residue: | |
lines.append(self.m_residue); | |
self.m_residue = None | |
while not self.m_residue and self.m_fd: | |
chunk = self.m_fd.read(1024*64) | |
if not chunk: | |
self.m_fd.close() | |
self.m_fd = None | |
else: | |
c = chunk.split('\n', 1) | |
if (len(c) == 1): | |
lines.append(c[0]) | |
else: | |
lines.append(c[0]) | |
lines.append('\n') | |
self.m_residue = c[1] | |
finally: | |
self.m_sem.release() | |
return lines | |
# | |
# Session dictionary | |
# sess[TID][fname] is a Session object | |
# | |
sess = {} | |
sessSem = threading.Semaphore(1) | |
def findSession(TID, fname): | |
global sess, sessSem | |
key = (TID, fname) | |
sessSem.acquire() | |
try: | |
if key not in sess: | |
fd = open(fname, 'r', 1024*1024) | |
sess[key] = Session(fd, fname) | |
if opt['-v']: print '[INFO] initiated session', key | |
else: | |
if opt['-v']: print '[INFO] joined session', key | |
finally: | |
sessSem.release(); | |
return sess[key] | |
# | |
# Class to handle individual request | |
# | |
class GPFDistRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
counter = 0 | |
def log_request(self, code): | |
print "%s - %s" % (self.client_address[0], code) | |
def send400(self, msg): | |
print 'ERROR: %s' % msg | |
self.send_response(400) | |
return None | |
def send200empty(self): | |
if opt['-v']: print ' [ignore] thread %s' % self.client_address[0] | |
self.send_response(200) | |
self.send_header("Content-type", "text/plain") | |
self.send_header("Content-length", "0") | |
self.end_headers() | |
return None | |
def do_GET(self): | |
try: | |
GPFDistRequestHandler.counter = GPFDistRequestHandler.counter + 1 | |
TID = self.headers.getheader('X-GP-TID') | |
if not TID: | |
# start an non-transaction session | |
TID = 'auto-tid.' + str(GPFDistRequestHandler.counter) | |
fname = self.path | |
if fname.find('/') == 0: fname = fname[1:] | |
fname = os.path.join(opt['-d'], fname) | |
fname = os.path.normpath(fname) | |
if fname.find(opt['-d']) != 0: | |
msg = 'bad path specified (%s)' % (self.path) | |
return self.send400(msg) | |
try: | |
s = findSession(TID, fname) | |
if not s: | |
msg = 'unable to serve TID %s' % TID | |
return self.send400(msg) | |
if s.m_threadcnt >= MAX_CONCURRENT_SESSION: | |
return send200empty(self) | |
s.m_threadcnt = s.m_threadcnt + 1 | |
except IOError, e: | |
msg = str(e) | |
return self.send400(msg) | |
self.send_response(200) | |
self.send_header("Content-type", "text/plain") | |
self.end_headers() | |
# parse in CSV format if quote char was specified | |
if qc: | |
# skip the first line if 'header' option was specified | |
if opt['-h']: | |
line = s.readLine() | |
while 1: | |
line = s.readLine() | |
if not line: break | |
self.wfile.write(line) | |
# parse in text format otherwise | |
else: | |
while 1: | |
chunks = s.readFile() | |
if not chunks: break | |
for c in chunks: | |
self.wfile.write(c) | |
s.m_threadcnt = s.m_threadcnt - 1 | |
if opt['-V']: print ' %s done' % fname | |
if TID[0] == 'a' and TID.find('auto-tid.') == 0: | |
del sess[(TID, fname)] | |
except socket.error, e: | |
print 'socket error: ', str(e), 'while serving', self.path | |
class GPFDistServer(SocketServer.ThreadingTCPServer): | |
allow_reuse_address = 1 | |
request_queue_size = 256 | |
def server_bind(self): | |
SocketServer.ThreadingTCPServer.server_bind(self) | |
host, port = self.socket.getsockname()[:2] | |
self.server_name = socket.getfqdn(host) | |
self.server_post = port | |
try: | |
parseCommandLine() | |
serverAddress = ('', opt['-p']) | |
GPFDistRequestHandler.protocol_version = "HTTP/1.0" | |
httpd = GPFDistServer(serverAddress, GPFDistRequestHandler) | |
sa = httpd.socket.getsockname() | |
print "Serving HTTP on %s:%d, directory %s ..." % (sa[0], sa[1], os.path.abspath(opt['-d'])) | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
sys.exit('[Interrupted ...]') | |