| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Command line utility for reading and writing Avro files.""" |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import csv |
| import itertools |
| import json |
| import os.path |
| import sys |
| from functools import partial |
| from optparse import OptionGroup, OptionParser |
| |
| import avro |
| import avro.schema |
| from avro.datafile import DataFileReader, DataFileWriter |
| from avro.io import DatumReader, DatumWriter |
| |
| _AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__)) |
| |
| ifilter = getattr(itertools, 'ifilter', filter) |
| imap = getattr(itertools, 'imap', map) |
| |
| try: |
| unicode |
| except NameError: |
| unicode = str |
| |
| try: |
| long |
| except NameError: |
| long = int |
| |
| |
| def _version(): |
| with open(os.path.join(_AVRO_DIR, 'VERSION.txt')) as v: |
| return v.read() |
| |
| |
| _AVRO_VERSION = _version() |
| |
| |
| class AvroError(Exception): |
| pass |
| |
| |
| def print_json(row): |
| print(json.dumps(row)) |
| |
| |
| def print_json_pretty(row): |
| """Pretty print JSON""" |
| # Need to work around https://bugs.python.org/issue16333 |
| # where json.dumps leaves trailing spaces. |
| result = json.dumps(row, sort_keys=True, indent=4).replace(' \n', '\n') |
| print(result) |
| |
| |
| _write_row = csv.writer(sys.stdout).writerow |
| |
| |
| def print_csv(row): |
| # We sort the keys to the fields will be in the same place |
| # FIXME: Do we want to do it in schema order? |
| _write_row([row[key] for key in sorted(row)]) |
| |
| |
| def select_printer(format): |
| return { |
| "json": print_json, |
| "json-pretty": print_json_pretty, |
| "csv": print_csv |
| }[format] |
| |
| |
| def record_match(expr, record): |
| return eval(expr, None, {"r": record}) |
| |
| |
| def parse_fields(fields): |
| fields = fields or '' |
| if not fields.strip(): |
| return None |
| |
| return [field.strip() for field in fields.split(',') if field.strip()] |
| |
| |
| def field_selector(fields): |
| fields = set(fields) |
| |
| def keys_filter(obj): |
| return dict((k, obj[k]) for k in (set(obj) & fields)) |
| return keys_filter |
| |
| |
| def print_avro(avro, opts): |
| if opts.header and (opts.format != "csv"): |
| raise AvroError("--header applies only to CSV format") |
| |
| # Apply filter first |
| if opts.filter: |
| avro = ifilter(partial(record_match, opts.filter), avro) |
| |
| for i in range(opts.skip): |
| try: |
| next(avro) |
| except StopIteration: |
| return |
| |
| fields = parse_fields(opts.fields) |
| if fields: |
| avro = imap(field_selector(fields), avro) |
| |
| printer = select_printer(opts.format) |
| for i, record in enumerate(avro): |
| if i == 0 and opts.header: |
| _write_row(sorted(record.keys())) |
| if i >= opts.count: |
| break |
| printer(record) |
| |
| |
| def print_schema(avro): |
| schema = avro.schema |
| # Pretty print |
| print(json.dumps(json.loads(schema), indent=4)) |
| |
| |
| def cat(opts, args): |
| if not args: |
| raise AvroError("No files to show") |
| for filename in args: |
| with DataFileReader(open(filename, 'rb'), DatumReader()) as avro: |
| if opts.print_schema: |
| print_schema(avro) |
| continue |
| print_avro(avro, opts) |
| |
| |
| def _open(filename, mode): |
| if filename == "-": |
| return { |
| "rb": sys.stdin, |
| "wb": sys.stdout |
| }[mode] |
| |
| return open(filename, mode) |
| |
| |
| def iter_json(info, _): |
| for i in info: |
| try: |
| s = i.decode() |
| except AttributeError: |
| s = i |
| yield json.loads(s) |
| |
| |
| def convert(value, field): |
| type = field.type.type |
| if type == "union": |
| return convert_union(value, field) |
| |
| return { |
| "int": int, |
| "long": long, |
| "float": float, |
| "double": float, |
| "string": unicode, |
| "bytes": bytes, |
| "boolean": bool, |
| "null": lambda _: None, |
| "union": lambda v: convert_union(v, field), |
| }[type](value) |
| |
| |
| def convert_union(value, field): |
| for name in [s.name for s in field.type.schemas]: |
| try: |
| return convert(name)(value) |
| except ValueError: |
| continue |
| |
| |
| def iter_csv(info, schema): |
| header = [field.name for field in schema.fields] |
| for row in csv.reader((getattr(i, "decode", lambda: i)() for i in info)): |
| values = [convert(v, f) for v, f in zip(row, schema.fields)] |
| yield dict(zip(header, values)) |
| |
| |
| def guess_input_type(files): |
| if not files: |
| return None |
| |
| ext = os.path.splitext(files[0])[1].lower() |
| if ext in (".json", ".js"): |
| return "json" |
| elif ext in (".csv",): |
| return "csv" |
| |
| return None |
| |
| |
| def write(opts, files): |
| if not opts.schema: |
| raise AvroError("No schema specified") |
| |
| input_type = opts.input_type or guess_input_type(files) |
| if not input_type: |
| raise AvroError("Can't guess input file type (not .json or .csv)") |
| iter_records = {"json": iter_json, "csv": iter_csv}[input_type] |
| |
| try: |
| with open(opts.schema) as schema_file: |
| schema = avro.schema.parse(schema_file.read()) |
| out = _open(opts.output, "wb") |
| except (IOError, OSError) as e: |
| raise AvroError("Can't open file - %s" % e) |
| |
| writer = DataFileWriter(getattr(out, 'buffer', out), DatumWriter(), schema) |
| |
| for filename in (files or ["-"]): |
| info = _open(filename, "rb") |
| for record in iter_records(info, schema): |
| writer.append(record) |
| |
| writer.close() |
| |
| |
| def main(argv=None): |
| argv = argv or sys.argv |
| |
| parser = OptionParser(description="Display/write for Avro files", |
| version=_AVRO_VERSION, |
| usage="usage: %prog cat|write [options] FILE [FILE...]") |
| # cat options |
| |
| cat_options = OptionGroup(parser, "cat options") |
| cat_options.add_option("-n", "--count", default=float("Infinity"), |
| help="number of records to print", type=int) |
| cat_options.add_option("-s", "--skip", help="number of records to skip", |
| type=int, default=0) |
| cat_options.add_option("-f", "--format", help="record format", |
| default="json", |
| choices=["json", "csv", "json-pretty"]) |
| cat_options.add_option("--header", help="print CSV header", default=False, |
| action="store_true") |
| cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)", |
| default=None) |
| cat_options.add_option("--print-schema", help="print schema", |
| action="store_true", default=False) |
| cat_options.add_option('--fields', default=None, |
| help='fields to show, comma separated (show all by default)') |
| parser.add_option_group(cat_options) |
| |
| # write options |
| write_options = OptionGroup(parser, "write options") |
| write_options.add_option("--schema", help="schema file (required)") |
| write_options.add_option("--input-type", |
| help="input file(s) type (json or csv)", |
| choices=["json", "csv"], default=None) |
| write_options.add_option("-o", "--output", help="output file", default="-") |
| parser.add_option_group(write_options) |
| |
| opts, args = parser.parse_args(argv[1:]) |
| if len(args) < 1: |
| parser.error("You much specify `cat` or `write`") # Will exit |
| |
| command = args.pop(0) |
| try: |
| if command == "cat": |
| cat(opts, args) |
| elif command == "write": |
| write(opts, args) |
| else: |
| raise AvroError("Unknown command - %s" % command) |
| except AvroError as e: |
| parser.error("%s" % e) # Will exit |
| |
| |
| if __name__ == "__main__": |
| main() |