| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Command line utility for reading and writing Avro files.""" |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import csv |
| import json |
| from functools import partial |
| from itertools import ifilter, imap |
| from os.path import splitext |
| from sys import stdin, stdout |
| |
| import avro.schema |
| from avro.datafile import DataFileReader, DataFileWriter |
| from avro.io import DatumReader, DatumWriter |
| |
| |
| class AvroError(Exception): |
| pass |
| |
| def print_json(row): |
| print(json.dumps(row)) |
| |
| def print_json_pretty(row): |
| print(json.dumps(row, indent=4)) |
| |
| _write_row = csv.writer(stdout).writerow |
| _encoding = stdout.encoding or "UTF-8" |
| def _encode(v, encoding=_encoding): |
| if not isinstance(v, basestring): |
| return v |
| return v.encode(_encoding) |
| |
| def print_csv(row): |
| # We sort the keys to the fields will be in the same place |
| # FIXME: Do we want to do it in schema order? |
| _write_row([_encode(row[key]) for key in sorted(row)]) |
| |
| def select_printer(format): |
| return { |
| "json": print_json, |
| "json-pretty": print_json_pretty, |
| "csv": print_csv |
| }[format] |
| |
| def record_match(expr, record): |
| return eval(expr, None, {"r" : record}) |
| |
| def parse_fields(fields): |
| fields = fields or '' |
| if not fields.strip(): |
| return None |
| |
| return [field.strip() for field in fields.split(',') if field.strip()] |
| |
| def field_selector(fields): |
| fields = set(fields) |
| def keys_filter(obj): |
| return dict((k, obj[k]) for k in (set(obj) & fields)) |
| return keys_filter |
| |
| def print_avro(avro, opts): |
| if opts.header and (opts.format != "csv"): |
| raise AvroError("--header applies only to CSV format") |
| |
| # Apply filter first |
| if opts.filter: |
| avro = ifilter(partial(record_match, opts.filter), avro) |
| |
| for i in xrange(opts.skip): |
| try: |
| next(avro) |
| except StopIteration: |
| return |
| |
| fields = parse_fields(opts.fields) |
| if fields: |
| avro = imap(field_selector(fields), avro) |
| |
| printer = select_printer(opts.format) |
| for i, record in enumerate(avro): |
| if i == 0 and opts.header: |
| _write_row(sorted(record.keys())) |
| if i >= opts.count: |
| break |
| printer(record) |
| |
| def print_schema(avro): |
| schema = avro.meta["avro.schema"] |
| # Pretty print |
| print(json.dumps(json.loads(schema), indent=4)) |
| |
| def cat(opts, args): |
| if not args: |
| raise AvroError("No files to show") |
| |
| for filename in args: |
| try: |
| fo = open(filename, "rb") |
| except (OSError, IOError), e: |
| raise AvroError("Can't open %s - %s" % (filename, e)) |
| |
| avro = DataFileReader(fo, DatumReader()) |
| |
| if opts.print_schema: |
| print_schema(avro) |
| continue |
| |
| print_avro(avro, opts) |
| |
| def _open(filename, mode): |
| if filename == "-": |
| return { |
| "rb" : stdin, |
| "wb" : stdout |
| }[mode] |
| |
| return open(filename, mode) |
| |
| def iter_json(info, _): |
| return imap(json.loads, info) |
| |
| def convert(value, field): |
| type = field.type.type |
| if type == "union": |
| return convert_union(value, field) |
| |
| return { |
| "int" : int, |
| "long" : long, |
| "float" : float, |
| "double" : float, |
| "string" : str, |
| "bytes" : str, |
| "boolean" : bool, |
| "null" : lambda _: None, |
| "union" : lambda v: convert_union(v, field), |
| }[type](value) |
| |
| def convert_union(value, field): |
| for name in [s.name for s in field.type.schemas]: |
| try: |
| return convert(name)(value) |
| except ValueError: |
| continue |
| |
| def iter_csv(info, schema): |
| header = [field.name for field in schema.fields] |
| for row in csv.reader(info): |
| values = [convert(v, f) for v, f in zip(row, schema.fields)] |
| yield dict(zip(header, values)) |
| |
| def guess_input_type(files): |
| if not files: |
| return None |
| |
| ext = splitext(files[0])[1].lower() |
| if ext in (".json", ".js"): |
| return "json" |
| elif ext in (".csv",): |
| return "csv" |
| |
| return None |
| |
| def write(opts, files): |
| if not opts.schema: |
| raise AvroError("No schema specified") |
| |
| input_type = opts.input_type or guess_input_type(files) |
| if not input_type: |
| raise AvroError("Can't guess input file type (not .json or .csv)") |
| |
| try: |
| schema = avro.schema.parse(open(opts.schema, "rb").read()) |
| out = _open(opts.output, "wb") |
| except (IOError, OSError), e: |
| raise AvroError("Can't open file - %s" % e) |
| |
| writer = DataFileWriter(out, DatumWriter(), schema) |
| |
| iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type] |
| for filename in (files or ["-"]): |
| info = _open(filename, "rb") |
| for record in iter_records(info, schema): |
| writer.append(record) |
| |
| writer.close() |
| |
| def main(argv=None): |
| import sys |
| from optparse import OptionParser, OptionGroup |
| |
| argv = argv or sys.argv |
| |
| parser = OptionParser(description="Display/write for Avro files", |
| version="@AVRO_VERSION@", |
| usage="usage: %prog cat|write [options] FILE [FILE...]") |
| # cat options |
| |
| cat_options = OptionGroup(parser, "cat options") |
| cat_options.add_option("-n", "--count", default=float("Infinity"), |
| help="number of records to print", type=int) |
| cat_options.add_option("-s", "--skip", help="number of records to skip", |
| type=int, default=0) |
| cat_options.add_option("-f", "--format", help="record format", |
| default="json", |
| choices=["json", "csv", "json-pretty"]) |
| cat_options.add_option("--header", help="print CSV header", default=False, |
| action="store_true") |
| cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)", |
| default=None) |
| cat_options.add_option("--print-schema", help="print schema", |
| action="store_true", default=False) |
| cat_options.add_option('--fields', default=None, |
| help='fields to show, comma separated (show all by default)') |
| parser.add_option_group(cat_options) |
| |
| # write options |
| write_options = OptionGroup(parser, "write options") |
| write_options.add_option("--schema", help="schema file (required)") |
| write_options.add_option("--input-type", |
| help="input file(s) type (json or csv)", |
| choices=["json", "csv"], default=None) |
| write_options.add_option("-o", "--output", help="output file", default="-") |
| parser.add_option_group(write_options) |
| |
| opts, args = parser.parse_args(argv[1:]) |
| if len(args) < 1: |
| parser.error("You much specify `cat` or `write`") # Will exit |
| |
| command = args.pop(0) |
| try: |
| if command == "cat": |
| cat(opts, args) |
| elif command == "write": |
| write(opts, args) |
| else: |
| raise AvroError("Unknown command - %s" % command) |
| except AvroError, e: |
| parser.error("%s" % e) # Will exit |
| except Exception, e: |
| raise SystemExit("panic: %s" % e) |
| |
| if __name__ == "__main__": |
| main() |