blob: e64c67485bea2af872897358bc23faeb0d6a630d [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command-line tool
NOTE: The API for the command-line tool is experimental.
"""
from __future__ import absolute_import, division, print_function
import os.path
import sys
import threading
import warnings
import avro.io
from avro import datafile, ipc, protocol
try:
import BaseHTTPServer as http_server # type: ignore
except ImportError:
import http.server as http_server # type: ignore
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse # type: ignore
class GenericResponder(ipc.Responder):
def __init__(self, proto, msg, datum):
proto_json = open(proto, 'rb').read()
ipc.Responder.__init__(self, protocol.parse(proto_json))
self.msg = msg
self.datum = datum
def invoke(self, message, request):
if message.name == self.msg:
print("Message: %s Datum: %s" % (message.name, self.datum), file=sys.stderr)
# server will shut down after processing a single Avro request
global server_should_shutdown
server_should_shutdown = True
return self.datum
class GenericHandler(http_server.BaseHTTPRequestHandler):
def do_POST(self):
self.responder = responder
call_request_reader = ipc.FramedReader(self.rfile)
call_request = call_request_reader.read_framed_message()
resp_body = self.responder.respond(call_request)
self.send_response(200)
self.send_header('Content-Type', 'avro/binary')
self.end_headers()
resp_writer = ipc.FramedWriter(self.wfile)
resp_writer.write_framed_message(resp_body)
if server_should_shutdown:
print("Shutting down server.", file=sys.stderr)
quitter = threading.Thread(target=self.server.shutdown)
quitter.daemon = True
quitter.start()
def run_server(uri, proto, msg, datum):
url_obj = urlparse(uri)
server_addr = (url_obj.hostname, url_obj.port)
global responder
global server_should_shutdown
server_should_shutdown = False
responder = GenericResponder(proto, msg, datum)
server = http_server.HTTPServer(server_addr, GenericHandler)
print("Port: %s" % server.server_port)
sys.stdout.flush()
server.allow_reuse_address = True
print("Starting server.", file=sys.stderr)
server.serve_forever()
def send_message(uri, proto, msg, datum):
url_obj = urlparse(uri)
client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
proto_json = open(proto, 'rb').read()
requestor = ipc.Requestor(protocol.parse(proto_json), client)
print(requestor.request(msg, datum))
##
# TODO: Replace this with fileinput()
def file_or_stdin(f):
return sys.stdin if f == '-' else open(f, 'rb')
def main(args=sys.argv):
if len(args) == 1:
print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0])
return 1
if args[1] == "dump":
if len(args) != 3:
print("Usage: %s dump input_file" % args[0])
return 1
for d in datafile.DataFileReader(file_or_stdin(args[2]), avro.io.DatumReader()):
print(repr(d))
elif args[1] == "rpcreceive":
usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
usage_str += "message_name (-data d | -file f)"
if len(args) not in [5, 7]:
print(usage_str)
return 1
uri, proto, msg = args[2:5]
datum = None
if len(args) > 5:
if args[5] == "-file":
reader = open(args[6], 'rb')
datum_reader = avro.io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
datum = next(dfr)
elif args[5] == "-data":
print("JSON Decoder not yet implemented.")
return 1
else:
print(usage_str)
return 1
run_server(uri, proto, msg, datum)
elif args[1] == "rpcsend":
usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
usage_str += "message_name (-data d | -file f)"
if len(args) not in [5, 7]:
print(usage_str)
return 1
uri, proto, msg = args[2:5]
datum = None
if len(args) > 5:
if args[5] == "-file":
reader = open(args[6], 'rb')
datum_reader = avro.io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
datum = next(dfr)
elif args[5] == "-data":
print("JSON Decoder not yet implemented.")
return 1
else:
print(usage_str)
return 1
send_message(uri, proto, msg, datum)
return 0
if __name__ == "__main__":
if os.path.dirname(avro.io.__file__) in sys.path:
warnings.warn("Invoking avro/tool.py directly is likely to lead to a name collision "
"with the python io module. Try doing `python -m avro.tool` instead.")
sys.exit(main(sys.argv))