blob: 2e53afd72eb085c7d0ba2299748f5c50ac24bf5f [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command line utlity for reading and writing Avro files."""
from avro.io import DatumReader, DatumWriter
from avro.datafile import DataFileReader, DataFileWriter
import avro.schema
try:
import json
except ImportError:
import simplejson as json
import csv
from sys import stdout, stdin
from itertools import ifilter, imap
from functools import partial
from os.path import splitext
class AvroError(Exception):
pass
def print_json(row):
print(json.dumps(row))
def print_json_pretty(row):
print(json.dumps(row, indent=4))
_write_row = csv.writer(stdout).writerow
_encoding = stdout.encoding or "UTF-8"
def _encode(v, encoding=_encoding):
if not isinstance(v, basestring):
return v
return v.encode(_encoding)
def print_csv(row):
# We sort the keys to the fields will be in the same place
# FIXME: Do we want to do it in schema order?
_write_row([_encode(row[key]) for key in sorted(row)])
def select_printer(format):
return {
"json" : print_json,
"json-pretty" : print_json_pretty,
"csv" : print_csv
}[format]
def record_match(expr, record):
return eval(expr, None, {"r" : record})
def parse_fields(fields):
fields = fields or ''
if not fields.strip():
return None
return [field.strip() for field in fields.split(',') if field.strip()]
def field_selector(fields):
fields = set(fields)
def keys_filter(obj):
return dict((k, obj[k]) for k in (set(obj) & fields))
return keys_filter
def print_avro(avro, opts):
if opts.header and (opts.format != "csv"):
raise AvroError("--header applies only to CSV format")
# Apply filter first
if opts.filter:
avro = ifilter(partial(record_match, opts.filter), avro)
for i in xrange(opts.skip):
try:
next(avro)
except StopIteration:
return
fields = parse_fields(opts.fields)
if fields:
avro = imap(field_selector(fields), avro)
printer = select_printer(opts.format)
for i, record in enumerate(avro):
if i == 0 and opts.header:
_write_row(sorted(record.keys()))
if i >= opts.count:
break
printer(record)
def print_schema(avro):
schema = avro.meta["avro.schema"]
# Pretty print
print json.dumps(json.loads(schema), indent=4)
def cat(opts, args):
if not args:
raise AvroError("No files to show")
for filename in args:
try:
fo = open(filename, "rb")
except (OSError, IOError), e:
raise AvroError("Can't open %s - %s" % (filename, e))
avro = DataFileReader(fo, DatumReader())
if opts.print_schema:
print_schema(avro)
continue
print_avro(avro, opts)
def _open(filename, mode):
if filename == "-":
return {
"rb" : stdin,
"wb" : stdout
}[mode]
return open(filename, mode)
def iter_json(info, _):
return imap(json.loads, info)
def convert(value, field):
type = field.type.type
if type == "union":
return convert_union(value, field)
return {
"int" : int,
"long" : long,
"float" : float,
"double" : float,
"string" : str,
"bytes" : str,
"boolean" : bool,
"null" : lambda _: None,
"union" : lambda v: convert_union(v, field),
}[type](value)
def convert_union(value, field):
for name in [s.name for s in field.type.schemas]:
try:
return convert(name)(value)
except ValueError:
continue
def iter_csv(info, schema):
header = [field.name for field in schema.fields]
for row in csv.reader(info):
values = [convert(v, f) for v, f in zip(row, schema.fields)]
yield dict(zip(header, values))
def guess_input_type(files):
if not files:
return None
ext = splitext(files[0])[1].lower()
if ext in (".json", ".js"):
return "json"
elif ext in (".csv",):
return "csv"
return None
def write(opts, files):
if not opts.schema:
raise AvroError("No schema specified")
input_type = opts.input_type or guess_input_type(files)
if not input_type:
raise AvroError("Can't guess input file type (not .json or .csv)")
try:
schema = avro.schema.parse(open(opts.schema, "rb").read())
out = _open(opts.output, "wb")
except (IOError, OSError), e:
raise AvroError("Can't open file - %s" % e)
writer = DataFileWriter(out, DatumWriter(), schema)
iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
for filename in (files or ["-"]):
info = _open(filename, "rb")
for record in iter_records(info, schema):
writer.append(record)
writer.close()
def main(argv=None):
import sys
from optparse import OptionParser, OptionGroup
argv = argv or sys.argv
parser = OptionParser(description="Display/write for Avro files",
version="@AVRO_VERSION@",
usage="usage: %prog cat|write [options] FILE [FILE...]")
# cat options
cat_options = OptionGroup(parser, "cat options")
cat_options.add_option("-n", "--count", default=float("Infinity"),
help="number of records to print", type=int)
cat_options.add_option("-s", "--skip", help="number of records to skip",
type=int, default=0)
cat_options.add_option("-f", "--format", help="record format",
default="json",
choices=["json", "csv", "json-pretty"])
cat_options.add_option("--header", help="print CSV header", default=False,
action="store_true")
cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)",
default=None)
cat_options.add_option("--print-schema", help="print schema",
action="store_true", default=False)
cat_options.add_option('--fields', default=None,
help='fields to show, comma separated (show all by default)')
parser.add_option_group(cat_options)
# write options
write_options = OptionGroup(parser, "write options")
write_options.add_option("--schema", help="schema file (required)")
write_options.add_option("--input-type",
help="input file(s) type (json or csv)",
choices=["json", "csv"], default=None)
write_options.add_option("-o", "--output", help="output file", default="-")
parser.add_option_group(write_options)
opts, args = parser.parse_args(argv[1:])
if len(args) < 1:
parser.error("You much specify `cat` or `write`") # Will exit
command = args.pop(0)
try:
if command == "cat":
cat(opts, args)
elif command == "write":
write(opts, args)
else:
raise AvroError("Unknown command - %s" % command)
except AvroError, e:
parser.error("%s" % e) # Will exit
except Exception, e:
raise SystemExit("panic: %s" % e)
if __name__ == "__main__":
main()