| #!/usr/bin/env python3 |
| # -*- mode: python -*- |
| # -*- coding: utf-8 -*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Command line utlity for reading and writing Avro files.""" |
| |
| import argparse |
| import csv |
| import functools |
| import itertools |
| import json |
| import os |
| import sys |
| import traceback |
| |
| import avro |
| from avro import datafile |
| from avro import io as avro_io |
| from avro import schema |
| |
| |
| # ------------------------------------------------------------------------------ |
| |
| |
| class AvroError(Exception): |
| """Errors in this module.""" |
| pass |
| |
| |
| def print_json(row): |
| print(json.dumps(row)) |
| |
| |
| def print_json_pretty(row): |
| print(json.dumps(row, indent=4, sort_keys=True)) |
| |
| _csv_writer = csv.writer(sys.stdout) |
| |
| def _write_row(row): |
| _csv_writer.writerow(row) |
| |
| |
| def print_csv(row): |
| # Sort record fields to ensure consistent ordering: |
| _write_row([row[key] for key in sorted(row)]) |
| |
| |
| def select_printer(format): |
| return { |
| 'json' : print_json, |
| 'json-pretty' : print_json_pretty, |
| 'csv' : print_csv |
| }[format] |
| |
| |
| def record_match(expr, record): |
| return eval(expr, None, {'r' : record}) |
| |
| |
| def parse_fields(fields): |
| fields = fields or '' |
| if not fields.strip(): |
| return None |
| |
| return [field.strip() for field in fields.split(',') if field.strip()] |
| |
| |
| def field_selector(fields): |
| fields = set(fields) |
| def keys_filter(obj): |
| return dict((k, obj[k]) for k in (set(obj) & fields)) |
| return keys_filter |
| |
| |
| def print_avro(avro, opts): |
| if opts.header and (opts.format != 'csv'): |
| raise AvroError('--header applies only to CSV format') |
| |
| # Apply filter first |
| if opts.filter: |
| avro = filter(functools.partial(record_match, opts.filter), avro) |
| |
| for i in range(opts.skip): |
| try: |
| next(avro) |
| except StopIteration: |
| return |
| |
| fields = parse_fields(opts.fields) |
| if fields: |
| avro = map(field_selector(fields), avro) |
| |
| printer = select_printer(opts.format) |
| for i, record in enumerate(avro): |
| if i == 0 and opts.header: |
| _write_row(sorted(record.keys())) |
| if i >= opts.count: |
| break |
| printer(record) |
| |
| |
| def print_schema(avro): |
| schema = avro.meta['avro.schema'].decode('utf-8') |
| print(json.dumps(json.loads(schema), indent=4)) |
| |
| |
| def cat(opts, args): |
| if not args: |
| raise AvroError('No files to show') |
| |
| for filename in args: |
| try: |
| fo = open(filename, 'rb') |
| except (OSError, IOError) as e: |
| raise AvroError('Cannot open %s - %s' % (filename, e)) |
| |
| avro = datafile.DataFileReader(fo, avro_io.DatumReader()) |
| |
| if opts.print_schema: |
| print_schema(avro) |
| continue |
| |
| print_avro(avro, opts) |
| |
| |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _open(filename, mode): |
| if filename == '-': |
| return { |
| 'rt' : sys.stdin, |
| 'wb' : sys.stdout.buffer, |
| }[mode] |
| |
| return open(filename, mode) |
| |
| |
| def iter_json(info, schema): |
| return map(json.loads, info) |
| |
| |
| def convert(value, field): |
| type = field.type.type |
| if type == 'union': |
| return convert_union(value, field) |
| |
| return { |
| 'int' : int, |
| 'long' : int, |
| 'float' : float, |
| 'double' : float, |
| 'string' : str, |
| 'bytes' : bytes, |
| 'boolean' : bool, |
| 'null' : lambda _: None, |
| 'union' : lambda v: convert_union(v, field), |
| }[type](value) |
| |
| |
| def convert_union(value, field): |
| for name in [s.name for s in field.type.schemas]: |
| try: |
| return convert(name)(value) |
| except ValueError: |
| continue |
| |
| |
| def iter_csv(info, schema): |
| header = [field.name for field in schema.fields] |
| for row in csv.reader(info): |
| values = [convert(v, f) for v, f in zip(row, schema.fields)] |
| yield dict(zip(header, values)) |
| |
| |
| def guess_input_type(files): |
| if not files: |
| return None |
| |
| ext = os.path.splitext(files[0])[1].lower() |
| if ext in ('.json', '.js'): |
| return 'json' |
| elif ext in ('.csv',): |
| return 'csv' |
| |
| return None |
| |
| |
| def write(opts, files): |
| if not opts.schema: |
| raise AvroError('No schema specified') |
| |
| input_type = opts.input_type or guess_input_type(files) |
| if not input_type: |
| raise AvroError('Cannot guess input file type (not .json or .csv)') |
| |
| try: |
| with open(opts.schema, 'rt') as f: |
| json_schema = f.read() |
| writer_schema = schema.Parse(json_schema) |
| out = _open(opts.output, 'wb') |
| except (IOError, OSError) as e: |
| raise AvroError('Cannot open file - %s' % e) |
| |
| record_parser_map = { |
| 'json': iter_json, |
| 'csv': iter_csv, |
| } |
| |
| with datafile.DataFileWriter( |
| writer=out, |
| datum_writer=avro_io.DatumWriter(), |
| writer_schema=writer_schema, |
| ) as writer: |
| iter_records = record_parser_map[input_type] |
| for filename in (files or ['-']): |
| reader = _open(filename, 'rt') |
| for record in iter_records(reader, writer_schema): |
| writer.append(record) |
| |
| |
| # ------------------------------------------------------------------------------ |
| |
| |
| def main(argv=None): |
| argv = argv or sys.argv |
| |
| parser = argparse.ArgumentParser( |
| description='Display/write for Avro files', |
| usage='%(prog)s cat|write [options] FILE [FILE...]', |
| ) |
| |
| parser.add_argument( |
| '--version', |
| action='version', |
| version='%(prog)s ' + avro.VERSION, |
| ) |
| |
| # cat options: |
| cat_options = parser.add_argument_group(title='cat options') |
| cat_options.add_argument( |
| '-n', '--count', |
| type=int, |
| default=float('Infinity'), |
| help='number of records to print', |
| ) |
| cat_options.add_argument( |
| '-s', '--skip', |
| type=int, |
| default=0, |
| help='number of records to skip', |
| ) |
| cat_options.add_argument( |
| '-f', '--format', |
| default='json', |
| choices=['json', 'csv', 'json-pretty'], |
| help='record format', |
| ) |
| cat_options.add_argument( |
| '--header', |
| default=False, |
| action='store_true', |
| help='print CSV header', |
| ) |
| cat_options.add_argument( |
| '--filter', |
| default=None, |
| help='filter records (e.g. r["age"]>1)', |
| ) |
| cat_options.add_argument( |
| '--print-schema', |
| default=False, |
| action='store_true', |
| help='print schema', |
| ) |
| cat_options.add_argument( |
| '--fields', |
| default=None, |
| help='fields to show, comma separated (show all by default)', |
| ) |
| |
| # write options |
| write_options = parser.add_argument_group(title='write options') |
| write_options.add_argument( |
| '--schema', |
| help='schema file (required)', |
| ) |
| write_options.add_argument( |
| '--input-type', |
| choices=['json', 'csv'], |
| default=None, |
| help='input file(s) type (json or csv)', |
| ) |
| write_options.add_argument( |
| '-o', '--output', |
| default='-', |
| help='output file', |
| ) |
| |
| opts, args = parser.parse_known_args(argv[1:]) |
| if len(args) < 1: |
| parser.error('You much specify `cat` or `write`.') |
| |
| command = args.pop(0) |
| try: |
| if command == 'cat': |
| cat(opts, args) |
| elif command == 'write': |
| write(opts, args) |
| else: |
| raise AvroError('Unknown command - %s' % command) |
| except AvroError as e: |
| parser.error('%s' % e) # Will exit |
| except Exception as e: |
| traceback.print_exc() |
| raise SystemExit('panic: %s' % e) |
| |
| |
| if __name__ == '__main__': |
| main() |