AVRO-2906: Traversal validation (#936)

* AVRO-2906: Convert validation to a traversal-based approach

Use schema-type specific iterators and validators to allow a
breadth-first traversal of a full schema, validating each node
as you go.

The benefit of this approach is that it allows us to pin-point
the specific part of the schema that has failed validation.
Where previously the error message for a large schema would print
the entire datum as well as the full schema and say "this is not
that", this new approach will print the specific sub-schema that has
failed in order to allow more informative errors.

A second improvement is that by traversing the schema instead of
processing it recursively, the algorithm is more efficient in use
of system resources.  In particular for schemas that have lots of
nested parts, this will make a difference.

Make the required changes to pass tests in all supported python versions.

This commit removes type hints present in the first commit in order to
allow using the code in older Python versions.

In addition:
  * the use of `str` has been replaced by the compatible `unicode`.
  * the ValidationNode namedtuple has been re-expressed in syntax available
    in all supported Python versions.
  * the use of a custom InvalidEvent exception has been replace by using
    AvroTypeException
  * all specific single-type validators have been replaced by partials of
    _validate_type with a tuple of one or more type objects.

Fix typos and raise StopIteration as suggested in code review

Move the responsibility for validation to the Schema class.

Each schema subclass will be responsible for its own validation. This
simplifies the structure of io.py, removes the dict lookup of validators,
and reduces somewhat the repetition that was in io.py.

Move validators to a class attribute and update method code.

This makes things look a little bit cleaner than having the validators right in the midst of the method.

Add arg spec docs to docstring for base Schema class.

Clean up mistakes.

* Fix a docstring to be a more accurate statement of reality.
* Remove an unused import.
* Remove extra blank lines.
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index d420c20..a476a7d 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -41,6 +41,49 @@
   * Schema floats are implemented as float.
   * Schema doubles are implemented as float.
   * Schema booleans are implemented as bool.
+
+Validation:
+
+The validation of schema is performed using breadth-first graph
+traversal. This allows validation exceptions to pinpoint the exact node
+within a complex schema that is problematic, simplifying debugging
+considerably. Because it is a traversal, it will also be less
+resource-intensive, particularly when validating schema with deep
+structures.
+
+Components
+==========
+
+Nodes
+-----
+Avro schemas contain many different schema types. Data about the schema
+types is used to validate the data in the corresponding part of a Python
+body (the object to be serialized). A node combines a given schema type
+with the corresponding Python data, as well as an optional "name" to
+identify the specific node. Names are generally the name of a schema
+(for named schema) or the name of a field (for child nodes of schema
+with named children like maps and records), or None, for schema who's
+children are not named (like Arrays).
+
+Iterators
+---------
+Iterators are generator functions that take a node and return a
+generator which will yield a node for each child datum in the data for
+the current node. If a node is of a type which has no children, then the
+default iterator will immediately exit.
+
+Validators
+----------
+Validators are used to determine if the datum for a given node is valid
+according to the given schema type. Validator functions take a node as
+an argument and return a node if the node datum passes validation. If it
+does not, the validator must return None.
+
+In most cases, the node returned is identical to the node provided (is
+in fact the same object). However, in the case of Union schema, the
+returned "valid" node will hold the schema that is represented by the
+datum contained. This allows iteration over the child nodes
+in that datum, if there are any.
 """
 
 from __future__ import absolute_import, division, print_function
@@ -48,7 +91,7 @@
 import datetime
 import json
 import struct
-import sys
+from collections import deque, namedtuple
 from decimal import Decimal, getcontext
 from struct import Struct
 
@@ -75,13 +118,6 @@
 # Constants
 #
 
-_DEBUG_VALIDATE_INDENT = 0
-_DEBUG_VALIDATE = False
-
-INT_MIN_VALUE = -(1 << 31)
-INT_MAX_VALUE = (1 << 31) - 1
-LONG_MIN_VALUE = -(1 << 63)
-LONG_MAX_VALUE = (1 << 63) - 1
 
 # TODO(hammer): shouldn't ! be < for little-endian (according to spec?)
 STRUCT_FLOAT = Struct('<f')           # big-endian float
@@ -96,79 +132,103 @@
 #
 
 
-def _is_timezone_aware_datetime(dt):
-    return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
+ValidationNode = namedtuple("ValidationNode", ['schema', 'datum', 'name'])
 
 
-_valid = {
-    'null': lambda s, d: d is None,
-    'boolean': lambda s, d: isinstance(d, bool),
-    'string': lambda s, d: isinstance(d, unicode),
-    'bytes': lambda s, d: ((isinstance(d, bytes)) or
-                           (isinstance(d, Decimal) and
-                            getattr(s, 'logical_type', None) == constants.DECIMAL)),
-    'int': lambda s, d: ((isinstance(d, (int, long))) and (INT_MIN_VALUE <= d <= INT_MAX_VALUE) or
-                         (isinstance(d, datetime.date) and
-                          getattr(s, 'logical_type', None) == constants.DATE) or
-                         (isinstance(d, datetime.time) and
-                          getattr(s, 'logical_type', None) == constants.TIME_MILLIS)),
-    'long': lambda s, d: ((isinstance(d, (int, long))) and (LONG_MIN_VALUE <= d <= LONG_MAX_VALUE) or
-                          (isinstance(d, datetime.time) and
-                           getattr(s, 'logical_type', None) == constants.TIME_MICROS) or
-                          (isinstance(d, datetime.date) and
-                           _is_timezone_aware_datetime(d) and
-                           getattr(s, 'logical_type', None) in (constants.TIMESTAMP_MILLIS,
-                                                                constants.TIMESTAMP_MICROS))),
-    'float': lambda s, d: isinstance(d, (int, long, float)),
-    'fixed': lambda s, d: ((isinstance(d, bytes) and len(d) == s.size) or
-                           (isinstance(d, Decimal) and
-                            getattr(s, 'logical_type', None) == constants.DECIMAL)),
-    'enum': lambda s, d: d in s.symbols,
+def validate(expected_schema, datum, raise_on_error=False):
+    """Return True if the provided datum is valid for the expected schema
 
-    'array': lambda s, d: isinstance(d, list) and all(validate(s.items, item) for item in d),
-    'map': lambda s, d: (isinstance(d, dict) and all(isinstance(key, unicode) for key in d) and
-                         all(validate(s.values, value) for value in d.values())),
-    'union': lambda s, d: any(validate(branch, d) for branch in s.schemas),
-    'record': lambda s, d: (isinstance(d, dict) and
-                            all(validate(f.type, d.get(f.name)) for f in s.fields) and
-                            {f.name for f in s.fields}.issuperset(d.keys())),
-}
-_valid['double'] = _valid['float']
-_valid['error_union'] = _valid['union']
-_valid['error'] = _valid['request'] = _valid['record']
+    If raise_on_error is passed and True, then raise a validation error
+    with specific information about the error encountered in validation.
 
-
-def validate(expected_schema, datum):
-    """Determines if a python datum is an instance of a schema.
-
-    Args:
-      expected_schema: Schema to validate against.
-      datum: Datum to validate.
-    Returns:
-      True if the datum is an instance of the schema.
+    :param expected_schema: An avro schema type object representing the schema against
+                            which the datum will be validated.
+    :param datum: The datum to be validated, A python dictionary or some supported type
+    :param raise_on_error: True if a AvroTypeException should be raised immediately when a
+                           validation problem is encountered.
+    :raises: AvroTypeException if datum is invalid and raise_on_error is True
+    :returns: True if datum is valid for expected_schema, False if not.
     """
-    global _DEBUG_VALIDATE_INDENT
-    global _DEBUG_VALIDATE
-    expected_type = expected_schema.type
-    name = getattr(expected_schema, 'name', '')
-    if name:
-        name = ' ' + name
-    if expected_type in ('array', 'map', 'union', 'record'):
-        if _DEBUG_VALIDATE:
-            print('{!s}{!s}{!s}: {!s} {{'.format(' ' * _DEBUG_VALIDATE_INDENT, expected_schema.type, name, type(datum).__name__), file=sys.stderr)
-            _DEBUG_VALIDATE_INDENT += 2
-            if datum is not None and not datum:
-                print('{!s}<Empty>'.format(' ' * _DEBUG_VALIDATE_INDENT), file=sys.stderr)
-        result = _valid[expected_type](expected_schema, datum)
-        if _DEBUG_VALIDATE:
-            _DEBUG_VALIDATE_INDENT -= 2
-            print('{!s}}} -> {!s}'.format(' ' * _DEBUG_VALIDATE_INDENT, result), file=sys.stderr)
-    else:
-        result = _valid[expected_type](expected_schema, datum)
-        if _DEBUG_VALIDATE:
-            print('{!s}{!s}{!s}: {!s} -> {!s}'.format(' ' * _DEBUG_VALIDATE_INDENT,
-                  expected_schema.type, name, type(datum).__name__, result), file=sys.stderr)
-    return result
+    # use a FIFO queue to process schema nodes breadth first.
+    nodes = deque()
+    nodes.append(ValidationNode(expected_schema, datum, getattr(expected_schema, "name", None)))
+
+    while nodes:
+        current_node = nodes.popleft()
+
+        # _validate_node returns the node for iteration if it is valid. Or it returns None
+        # if current_node.schema.type in {'array', 'map', 'record'}:
+        validated_schema = current_node.schema.validate(current_node.datum)
+        if validated_schema:
+            valid_node = ValidationNode(validated_schema, current_node.datum, current_node.name)
+        else:
+            valid_node = None
+        # else:
+        #     valid_node = _validate_node(current_node)
+
+        if valid_node is not None:
+            # if there are children of this node to append, do so.
+            for child_node in _iterate_node(valid_node):
+                nodes.append(child_node)
+        else:
+            # the current node was not valid.
+            if raise_on_error:
+                raise avro.errors.AvroTypeException(current_node.schema, current_node.datum)
+            else:
+                # preserve the prior validation behavior of returning false when there are problems.
+                return False
+
+    return True
+
+
+def _iterate_node(node):
+    for item in _ITERATORS.get(node.schema.type, _default_iterator)(node):
+        yield ValidationNode(*item)
+
+
+#############
+# Iteration #
+#############
+
+def _default_iterator(_):
+    """Immediately raise StopIteration.
+
+    This exists to prevent problems with iteration over unsupported container types.
+
+    More efficient approaches are not possible due to support for Python 2.7
+    """
+    for item in ():
+        yield item
+
+
+def _record_iterator(node):
+    """Yield each child node of the provided record node."""
+    schema, datum, name = node
+    for field in schema.fields:
+        yield ValidationNode(field.type, datum.get(field.name), field.name)  # type: ignore
+
+
+def _array_iterator(node):
+    """Yield each child node of the provided array node."""
+    schema, datum, name = node
+    for item in datum:  # type: ignore
+        yield ValidationNode(schema.items, item, name)
+
+
+def _map_iterator(node):
+    """Yield each child node of the provided map node."""
+    schema, datum, _ = node
+    child_schema = schema.values
+    for child_name, child_datum in datum.items():  # type: ignore
+        yield ValidationNode(child_schema, child_datum, child_name)
+
+
+_ITERATORS = {
+    'record': _record_iterator,
+    'array': _array_iterator,
+    'map': _map_iterator,
+}
+_ITERATORS['error'] = _ITERATORS['request'] = _ITERATORS['record']
 
 
 #
@@ -954,8 +1014,7 @@
                               set_writers_schema)
 
     def write(self, datum, encoder):
-        if not validate(self.writers_schema, datum):
-            raise avro.errors.AvroTypeException(self.writers_schema, datum)
+        validate(self.writers_schema, datum, raise_on_error=True)
         self.write_data(self.writers_schema, datum, encoder)
 
     def write_data(self, writers_schema, datum, encoder):
diff --git a/lang/py/avro/schema.py b/lang/py/avro/schema.py
index de66894..5aa9977 100644
--- a/lang/py/avro/schema.py
+++ b/lang/py/avro/schema.py
@@ -42,11 +42,13 @@
 
 from __future__ import absolute_import, division, print_function
 
+import datetime
 import json
 import math
 import re
 import sys
 import warnings
+from decimal import Decimal
 
 import avro.errors
 from avro import constants
@@ -122,6 +124,11 @@
     'ignore',
 )
 
+INT_MIN_VALUE = -(1 << 31)
+INT_MAX_VALUE = (1 << 31) - 1
+LONG_MIN_VALUE = -(1 << 63)
+LONG_MAX_VALUE = (1 << 63) - 1
+
 
 def validate_basename(basename):
     """Raise InvalidName if the given basename is not a valid name."""
@@ -131,11 +138,15 @@
                 "does not match the pattern {!s}".format(
                     basename, _BASE_NAME_PATTERN.pattern))
 
+
+def _is_timezone_aware_datetime(dt):
+    return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
+
+
 #
 # Base Classes
 #
 
-
 class Schema(object):
     """Base class for all Schema classes."""
     _props = None
@@ -202,6 +213,18 @@
         """
         raise NotImplemented("Must be implemented by subclasses.")
 
+    def validate(self, datum):
+        """Returns the appropriate schema object if datum is valid for that schema, else None.
+
+        Validation concerns only shape and type of data in the top level of the current schema.
+        In most cases, the returned schema object will be self. However, for UnionSchema objects,
+        the returned Schema will be the first branch schema for which validation passes.
+
+        @arg datum: The data to be checked for validity according to this schema
+        @return Optional[Schema]
+        """
+        raise Exception("Must be implemented by subclasses.")
+
 
 class Name(object):
     """Class to describe Avro name."""
@@ -475,6 +498,17 @@
 class PrimitiveSchema(Schema):
     """Valid primitive types are in PRIMITIVE_TYPES."""
 
+    _validators = {
+        'null': lambda x: x is None,
+        'boolean': lambda x: isinstance(x, bool),
+        'string': lambda x: isinstance(x, unicode),
+        'bytes': lambda x: isinstance(x, bytes),
+        'int': lambda x: isinstance(x, int) and INT_MIN_VALUE <= x <= INT_MAX_VALUE,
+        'long': lambda x: isinstance(x, int) and LONG_MIN_VALUE <= x <= LONG_MAX_VALUE,
+        'float': lambda x: isinstance(x, (int, float)),
+        'double': lambda x: isinstance(x, (int, float)),
+    }
+
     def __init__(self, type, other_props=None):
         # Ensure valid ctor args
         if type not in PRIMITIVE_TYPES:
@@ -503,6 +537,15 @@
         else:
             return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this type of primitive schema, else None
+
+        @arg datum: The data to be checked for validity according to this schema
+        @return Schema object or None
+        """
+        validator = self._validators.get(self.type, lambda x: False)
+        return self if validator(datum) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -525,6 +568,10 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a Decimal object, else None."""
+        return self if isinstance(datum, Decimal) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -565,6 +612,10 @@
             names.names[self.fullname] = self
             return names.prune_namespace(self.props)
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None."""
+        return self if isinstance(datum, bytes) and len(datum) == self.size else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -588,6 +639,10 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a Decimal object, else None."""
+        return self if isinstance(datum, Decimal) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -637,6 +692,10 @@
             names.names[self.fullname] = self
             return names.prune_namespace(self.props)
 
+    def validate(self, datum):
+        """Return self if datum is a valid member of this Enum, else None."""
+        return self if datum in self.symbols else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -656,7 +715,7 @@
         else:
             try:
                 items_schema = make_avsc_object(items, names)
-            except SchemaParseException as e:
+            except avro.errors.SchemaParseException as e:
                 fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' % (items, e, names.names.keys())
                 raise avro.errors.SchemaParseException(fail_msg)
 
@@ -681,6 +740,10 @@
         to_dump['items'] = item_schema.to_json(names)
         return to_dump
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None."""
+        return self if isinstance(datum, list) else None
+
     def __eq__(self, that):
         to_cmp = json.loads(str(self))
         return to_cmp == json.loads(str(that))
@@ -697,7 +760,7 @@
         else:
             try:
                 values_schema = make_avsc_object(values, names)
-            except SchemaParseException:
+            except avro.errors.SchemaParseException:
                 raise
             except Exception:
                 raise avro.errors.SchemaParseException('Values schema is not a valid Avro schema.')
@@ -722,6 +785,10 @@
         to_dump['values'] = self.get_prop('values').to_json(names)
         return to_dump
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None."""
+        return self if isinstance(datum, dict) and all(isinstance(key, unicode) for key in datum) else None
+
     def __eq__(self, that):
         to_cmp = json.loads(str(self))
         return to_cmp == json.loads(str(that))
@@ -780,6 +847,12 @@
             to_dump.append(schema.to_json(names))
         return to_dump
 
+    def validate(self, datum):
+        """Return the first branch schema of which datum is a valid example, else None."""
+        for branch in self.schemas:
+            if branch.validate(datum) is not None:
+                return branch
+
     def __eq__(self, that):
         to_cmp = json.loads(str(self))
         return to_cmp == json.loads(str(that))
@@ -901,6 +974,10 @@
         to_dump['fields'] = [f.to_json(names) for f in self.fields]
         return to_dump
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None"""
+        return self if isinstance(datum, dict) and {f.name for f in self.fields}.issuperset(datum.keys()) else None
+
     def __eq__(self, that):
         to_cmp = json.loads(str(self))
         return to_cmp == json.loads(str(that))
@@ -918,6 +995,10 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a valid date object, else None."""
+        return self if isinstance(datum, datetime.date) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -934,6 +1015,10 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None."""
+        return self if isinstance(datum, datetime.time) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -950,6 +1035,10 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        """Return self if datum is a valid representation of this schema, else None."""
+        return self if isinstance(datum, datetime.time) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -966,6 +1055,9 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        return self if isinstance(datum, datetime.datetime) and _is_timezone_aware_datetime(datum) else None
+
     def __eq__(self, that):
         return self.props == that.props
 
@@ -982,6 +1074,9 @@
     def to_json(self, names=None):
         return self.props
 
+    def validate(self, datum):
+        return self if isinstance(datum, datetime.datetime) and _is_timezone_aware_datetime(datum) else None
+
     def __eq__(self, that):
         return self.props == that.props