Avro union support (#10505)

* Avro union support

* Document new union support

* Add support for AvroStreamInputFormat and fix checkstyle

* Extend multi-member union test schema and format

* Some additional docs and add Enums to spelling

* Rename explodeUnions -> extractUnions

* explode -> extract

* ByType

* Correct spelling error
diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md
index a200e83..75704cf 100644
--- a/docs/development/extensions-core/avro.md
+++ b/docs/development/extensions-core/avro.md
@@ -39,13 +39,30 @@
 
 Druid supports most Avro types natively, there are however some exceptions which are detailed here.
 
-`union` types which aren't of the form `[null, otherType]` aren't supported at this time.
+#### Unions
+Druid has two modes for supporting `union` types.
 
+The default mode will treat unions as a single value regardless of the type it is populated with.
+
+If you wish to operate on each different member of a union however you can set `extractUnionsByType` on the Avro parser in which case unions will be expanded into nested objects according to the following rules:
+* Primitive types and unnamed complex types are keyed their type name. i.e `int`, `string`
+* Complex named types are keyed by their names, this includes `record`, `fixed` and `enum`.
+* The Avro null type is elided as its value can only ever be null
+
+This is safe because an Avro union can only contain a single member of each unnamed type and duplicates of the same named type are not allowed.
+i.e only a single array is allowed, multiple records (or other named types) are allowed as long as each has a unique name.
+
+The members can then be accessed using a [flattenSpec](../../ingestion/data-formats.md#flattenspec) similar other nested types.
+
+#### Binary types
 `bytes` and `fixed` Avro types will be returned by default as base64 encoded strings unless the `binaryAsString` option is enabled on the Avro parser.
 This setting will decode these types as UTF-8 strings.
 
+#### Enums
 `enum` types will be returned as `string` of the enum symbol.
 
+#### Complex types
 `record` and `map` types representing nested data can be ingested using [flattenSpec](../../ingestion/data-formats.md#flattenspec) on the parser.
 
-Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
\ No newline at end of file
+#### Logical types
+Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 39ce48e..165f6a4 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -34,18 +34,24 @@
 {
   private final ParseSpec parseSpec;
   private final boolean fromPigAvroStorage;
+  private final boolean binaryAsString;
+  private final boolean extractUnionsByType;
   private final ObjectFlattener<GenericRecord> avroFlattener;
   private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroHadoopInputRowParser(
       @JsonProperty("parseSpec") ParseSpec parseSpec,
-      @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
+      @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage,
+      @JsonProperty("binaryAsString") Boolean binaryAsString,
+      @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
   )
   {
     this.parseSpec = parseSpec;
-    this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
-    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
+    this.fromPigAvroStorage = fromPigAvroStorage != null && fromPigAvroStorage;
+    this.binaryAsString = binaryAsString != null && binaryAsString;
+    this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
+    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, this.binaryAsString, this.extractUnionsByType);
     this.mapParser = new MapInputRowParser(parseSpec);
   }
 
@@ -71,6 +77,6 @@
   @Override
   public InputRowParser withParseSpec(ParseSpec parseSpec)
   {
-    return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
+    return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
   }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index f375c63..fcb8bd8 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -36,6 +36,8 @@
 public class AvroStreamInputRowParser implements ByteBufferInputRowParser
 {
   private final ParseSpec parseSpec;
+  private final Boolean binaryAsString;
+  private final Boolean extractUnionsByType;
   private final AvroBytesDecoder avroBytesDecoder;
   private final ObjectFlattener<GenericRecord> avroFlattener;
   private final MapInputRowParser mapParser;
@@ -43,12 +45,16 @@
   @JsonCreator
   public AvroStreamInputRowParser(
       @JsonProperty("parseSpec") ParseSpec parseSpec,
-      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+      @JsonProperty("binaryAsString") Boolean binaryAsString,
+      @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
   )
   {
     this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
     this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
-    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+    this.binaryAsString = binaryAsString != null && binaryAsString;
+    this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
+    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, this.binaryAsString, this.extractUnionsByType);
     this.mapParser = new MapInputRowParser(parseSpec);
   }
 
@@ -76,7 +82,9 @@
   {
     return new AvroStreamInputRowParser(
         parseSpec,
-        avroBytesDecoder
+        avroBytesDecoder,
+        binaryAsString,
+        extractUnionsByType
     );
   }
 
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 8f64532..8526a04 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -44,13 +44,8 @@
 
 public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
 {
-  private static final JsonProvider AVRO_JSON_PROVIDER = new GenericAvroJsonProvider();
-  private static final Configuration JSONPATH_CONFIGURATION =
-      Configuration.builder()
-                   .jsonProvider(AVRO_JSON_PROVIDER)
-                   .mappingProvider(new NotImplementedMappingProvider())
-                   .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
-                   .build();
+  private final JsonProvider avroJsonProvider;
+  private final Configuration jsonPathConfiguration;
 
   private static final EnumSet<Schema.Type> ROOT_TYPES = EnumSet.of(
       Schema.Type.STRING,
@@ -99,10 +94,18 @@
    * @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
    * @param binaryAsString boolean to encode the byte[] as a string.
    */
-  public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
+  public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString, final boolean extractUnionsByType)
   {
     this.fromPigAvroStorage = fromPigAvroStorage;
     this.binaryAsString = binaryAsString;
+
+    this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
+    this.jsonPathConfiguration =
+        Configuration.builder()
+                     .jsonProvider(avroJsonProvider)
+                     .mappingProvider(new NotImplementedMappingProvider())
+                     .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
+                     .build();
   }
 
   @Override
@@ -126,7 +129,7 @@
   public Function<GenericRecord, Object> makeJsonPathExtractor(final String expr)
   {
     final JsonPath jsonPath = JsonPath.compile(expr);
-    return record -> transformValue(jsonPath.read(record, JSONPATH_CONFIGURATION));
+    return record -> transformValue(jsonPath.read(record, jsonPathConfiguration));
   }
 
   @Override
@@ -138,7 +141,7 @@
   @Override
   public JsonProvider getJsonProvider()
   {
-    return AVRO_JSON_PROVIDER;
+    return avroJsonProvider;
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
index 90c74f6..7417477 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
@@ -42,6 +42,7 @@
   private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class);
 
   private final boolean binaryAsString;
+  private final boolean extractUnionsByType;
   @Nullable
   private final Schema readerSchema;
 
@@ -50,7 +51,8 @@
       @JacksonInject @Json ObjectMapper mapper,
       @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
       @JsonProperty("schema") @Nullable Map<String, Object> schema,
-      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+      @JsonProperty("extractUnionsByType") @Nullable Boolean extractUnionsByType
   ) throws Exception
   {
     super(flattenSpec);
@@ -62,7 +64,8 @@
     } else {
       this.readerSchema = null;
     }
-    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+    this.binaryAsString = binaryAsString != null && binaryAsString;
+    this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
   }
 
   @Override
@@ -82,7 +85,8 @@
         temporaryDirectory,
         readerSchema,
         getFlattenSpec(),
-        binaryAsString
+        binaryAsString,
+        extractUnionsByType
     );
   }
 
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 97ab895..dc2a6d7 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -56,14 +56,15 @@
       File temporaryDirectory,
       @Nullable Schema readerSchema,
       JSONPathSpec flattenSpec,
-      boolean binaryAsString
+      boolean binaryAsString,
+      boolean extractUnionsByType
   )
   {
     this.inputRowSchema = inputRowSchema;
     this.source = source;
     this.temporaryDirectory = temporaryDirectory;
     this.readerSchema = readerSchema;
-    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 9d5665e..6d399a8 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -39,7 +39,8 @@
   public static ObjectFlattener<GenericRecord> makeFlattener(
       final ParseSpec parseSpec,
       final boolean fromPigAvroStorage,
-      final boolean binaryAsString
+      final boolean binaryAsString,
+      final boolean extractUnionsByType
   )
   {
     final JSONPathSpec flattenSpec;
@@ -49,7 +50,7 @@
       flattenSpec = JSONPathSpec.DEFAULT;
     }
 
-    return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
+    return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType));
   }
 
   public static List<InputRow> parseGenericRecord(
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
index 3b59b36..d42f359 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
@@ -35,6 +35,7 @@
 public class AvroStreamInputFormat extends NestedInputFormat
 {
   private final boolean binaryAsString;
+  private final boolean extractUnionsByType;
 
   private final AvroBytesDecoder avroBytesDecoder;
 
@@ -42,12 +43,14 @@
   public AvroStreamInputFormat(
       @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
       @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
-      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+      @JsonProperty("extractUnionsByType") @Nullable Boolean extractUnionsByType
   )
   {
     super(flattenSpec);
     this.avroBytesDecoder = avroBytesDecoder;
-    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+    this.binaryAsString = binaryAsString != null && binaryAsString;
+    this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
   }
 
   @Override
@@ -76,7 +79,8 @@
         source,
         avroBytesDecoder,
         getFlattenSpec(),
-        binaryAsString
+        binaryAsString,
+        extractUnionsByType
     );
   }
 
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index d8d92eb..c04ec14 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -52,13 +52,14 @@
       InputEntity source,
       AvroBytesDecoder avroBytesDecoder,
       JSONPathSpec flattenSpec,
-      boolean binaryAsString
+      boolean binaryAsString,
+      boolean extractUnionsByType
   )
   {
     this.inputRowSchema = inputRowSchema;
     this.source = source;
     this.avroBytesDecoder = avroBytesDecoder;
-    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index ab6a53e..86bbb7d 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -19,15 +19,19 @@
 
 package org.apache.druid.data.input.avro;
 
+import com.google.common.collect.ImmutableMap;
 import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.spi.json.JsonProvider;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 
 import javax.annotation.Nullable;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +45,13 @@
  */
 public class GenericAvroJsonProvider implements JsonProvider
 {
+  private final boolean extractUnionsByType;
+
+  GenericAvroJsonProvider(final boolean extractUnionsByType)
+  {
+    this.extractUnionsByType = extractUnionsByType;
+  }
+
   @Override
   public Object parse(final String s) throws InvalidJsonException
   {
@@ -148,7 +159,11 @@
     if (o == null) {
       return null;
     } else if (o instanceof GenericRecord) {
-      return ((GenericRecord) o).get(s);
+      final GenericRecord record = (GenericRecord) o;
+      if (extractUnionsByType && isExtractableUnion(record.getSchema().getField(s))) {
+        return extractUnionTypes(record.get(s));
+      }
+      return record.get(s);
     } else if (o instanceof Map) {
       final Map theMap = (Map) o;
       if (theMap.containsKey(s)) {
@@ -195,4 +210,46 @@
   {
     return o;
   }
+
+  private boolean isExtractableUnion(final Schema.Field field)
+  {
+    return field.schema().isUnion() &&
+           field.schema().getTypes().stream().filter(type -> type.getType() != Schema.Type.NULL).count() > 1;
+  }
+
+  private Map<String, Object> extractUnionTypes(final Object o)
+  {
+    // Primitive types and unnamped complex types are keyed their type name.
+    // Complex named types are keyed by their names.
+    // This is safe because an Avro union can only contain a single member of each unnamed type and duplicates
+    // of the same named type are not allowed. i.e only a single array is allowed, multiple records are allowed as
+    // long as each has a unique name.
+    // The Avro null type is elided as it's value can only ever be null
+    if (o instanceof Integer) {
+      return ImmutableMap.of("int", o);
+    } else if (o instanceof Long) {
+      return ImmutableMap.of("long", o);
+    } else if (o instanceof Float) {
+      return ImmutableMap.of("float", o);
+    } else if (o instanceof Double) {
+      return ImmutableMap.of("double", o);
+    } else if (o instanceof Boolean) {
+      return ImmutableMap.of("boolean", o);
+    } else if (o instanceof Utf8) {
+      return ImmutableMap.of("string", o);
+    } else if (o instanceof ByteBuffer) {
+      return ImmutableMap.of("bytes", o);
+    } else if (o instanceof Map) {
+      return ImmutableMap.of("map", o);
+    } else if (o instanceof List) {
+      return ImmutableMap.of("array", o);
+    } else if (o instanceof GenericRecord) {
+      return ImmutableMap.of(((GenericRecord) o).getSchema().getName(), o);
+    } else if (o instanceof GenericFixed) {
+      return ImmutableMap.of(((GenericFixed) o).getSchema().getName(), o);
+    } else if (o instanceof GenericEnumSymbol) {
+      return ImmutableMap.of(((GenericEnumSymbol<?>) o).getSchema().getName(), o);
+    }
+    return ImmutableMap.of();
+  }
 }
diff --git a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
index bf8ea68..f03cb70 100644
--- a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
+++ b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
@@ -1,38 +1,181 @@
-[{
-  "namespace": "org.apache.druid.data.input",
-	"name": "SomeAvroDatum",
-	"type": "record",
-	"fields" : [
-		{"name":"timestamp","type":"long"},
-		{"name":"eventType","type":"string"},
-		{"name":"id","type":"long"},
-		{"name":"someOtherId","type":"long"},
-		{"name":"isValid","type":"boolean"},
-		{"name":"someIntArray","type":{"type":"array","items":"int"}},
-		{"name":"someStringArray","type":{"type":"array","items":"string"}},
-		{"name":"someIntValueMap","type":{"type":"map","values":"int"}},
-		{"name":"someStringValueMap","type":{"type":"map","values":"string"}},
-		{"name":"someUnion","type":["null","string"]},
-		{"name":"someNull","type":"null"},
-    {"name":"someFixed","type":{"type":"fixed","size":16,"name":"MyFixed"}},
-    {"name":"someBytes","type":"bytes"},
-    {"name":"someEnum","type":{"type":"enum","name":"MyEnum","symbols":["ENUM0","ENUM1","ENUM2"]}},
-    {"name":"someRecord","type":{
-      "type":"record","name":"MySubRecord","fields":[
-        {"name":"subInt","type":"int"},
-        {"name":"subLong","type":"long"}
-      ]
-    }},
-
-   	{"name":"someLong","type":"long"},
-		{"name":"someInt","type":"int"},
-		{"name":"someFloat","type":"float"},
-		{"name":"someRecordArray","type":{
-		  "type":"array","items":{
-		   "type":"record","name":"MyNestedRecord","fields":[
-		    {"name":"nestedString","type":"string"}
+[
+  {
+    "namespace": "org.apache.druid.data.input",
+    "name": "SomeAvroDatum",
+    "type": "record",
+    "fields": [
+      {
+        "name": "timestamp",
+        "type": "long"
+      },
+      {
+        "name": "eventType",
+        "type": "string"
+      },
+      {
+        "name": "id",
+        "type": "long"
+      },
+      {
+        "name": "someOtherId",
+        "type": "long"
+      },
+      {
+        "name": "isValid",
+        "type": "boolean"
+      },
+      {
+        "name": "someIntArray",
+        "type": {
+          "type": "array",
+          "items": "int"
+        }
+      },
+      {
+        "name": "someStringArray",
+        "type": {
+          "type": "array",
+          "items": "string"
+        }
+      },
+      {
+        "name": "someIntValueMap",
+        "type": {
+          "type": "map",
+          "values": "int"
+        }
+      },
+      {
+        "name": "someStringValueMap",
+        "type": {
+          "type": "map",
+          "values": "string"
+        }
+      },
+      {
+        "name": "someUnion",
+        "type": [
+          "null",
+          "string"
         ]
-       }
-     }}
-	]
-}]
+      },
+      {
+        "name": "someMultiMemberUnion",
+        "type": [
+          "int",
+          "long",
+          "float",
+          "double",
+          "boolean",
+          "string",
+          "bytes",
+          {
+            "type": "map",
+            "values": "string"
+          },
+          {
+            "type": "array",
+            "items": "int"
+          },
+          {
+            "type": "record",
+            "name": "UnionSubRecord",
+            "fields": [
+              {
+                "name": "subString",
+                "type": "string"
+              }
+            ]
+          },
+          {
+            "name": "UnionSubFixed",
+            "type": "fixed",
+            "size": 10
+          },
+          {
+            "type": "enum",
+            "name": "UnionSubEnum",
+            "symbols": [
+              "ENUM0",
+              "ENUM1",
+              "ENUM2"
+            ]
+          }
+        ]
+      },
+      {
+        "name": "someNull",
+        "type": "null"
+      },
+      {
+        "name": "someFixed",
+        "type": {
+          "type": "fixed",
+          "size": 16,
+          "name": "MyFixed"
+        }
+      },
+      {
+        "name": "someBytes",
+        "type": "bytes"
+      },
+      {
+        "name": "someEnum",
+        "type": {
+          "type": "enum",
+          "name": "MyEnum",
+          "symbols": [
+            "ENUM0",
+            "ENUM1",
+            "ENUM2"
+          ]
+        }
+      },
+      {
+        "name": "someRecord",
+        "type": {
+          "type": "record",
+          "name": "MySubRecord",
+          "fields": [
+            {
+              "name": "subInt",
+              "type": "int"
+            },
+            {
+              "name": "subLong",
+              "type": "long"
+            }
+          ]
+        }
+      },
+      {
+        "name": "someLong",
+        "type": "long"
+      },
+      {
+        "name": "someInt",
+        "type": "int"
+      },
+      {
+        "name": "someFloat",
+        "type": "float"
+      },
+      {
+        "name": "someRecordArray",
+        "type": {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "MyNestedRecord",
+            "fields": [
+              {
+                "name": "nestedString",
+                "type": "string"
+              }
+            ]
+          }
+        }
+      }
+    ]
+  }
+]
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index 6e3cdb0..c6ade85 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -62,7 +62,7 @@
 
   private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
   {
-    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage);
+    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage, false, false);
     AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsBytes(parser),
         AvroHadoopInputRowParser.class
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 139616b..918aa0a 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -118,6 +118,7 @@
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
         flattenSpec,
         new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
         false
     );
     NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -134,6 +135,7 @@
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
         flattenSpec,
         new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
+        false,
         false
     );
     NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -150,6 +152,7 @@
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
         flattenSpec,
         new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
         false
     );
     NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -193,6 +196,7 @@
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
         flattenSpec,
         new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
         false
     );
     NestedInputFormat inputFormat2 = jsonMapper.readValue(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 0dbb6b9..8ed9f44 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -151,6 +151,7 @@
       }
   );
   private static final String SOME_UNION_VALUE = "string as union";
+  private static final Integer SOME_UNION_MEMBER_VALUE = 1;
   private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
   private static final String SOME_RECORD_STRING_VALUE = "string in record";
   private static final List<MyNestedRecord> SOME_RECORD_ARRAY_VALUE = Collections.singletonList(MyNestedRecord.newBuilder()
@@ -176,7 +177,9 @@
     Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC,
-        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
+        false
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -193,7 +196,9 @@
     Repository repository = new InMemoryRepository(null);
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC,
-        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
+        false
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -234,7 +239,9 @@
     Repository repository = new InMemoryRepository(null);
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC_SCHEMALESS,
-        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false,
+        false
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -354,7 +361,7 @@
     Assert.assertEquals(SOME_INT_VALUE, inputRow.getMetric("someInt"));
   }
 
-  public static SomeAvroDatum buildSomeAvroDatum()
+  private static SomeAvroDatum.Builder createSomeAvroDatumBuilderDefaults()
   {
     return SomeAvroDatum.newBuilder()
                         .setTimestamp(DATE_TIME.toInstant().toEpochMilli())
@@ -370,12 +377,24 @@
                         .setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE)
                         .setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE)
                         .setSomeUnion(SOME_UNION_VALUE)
+                        .setSomeMultiMemberUnion(SOME_UNION_MEMBER_VALUE)
                         .setSomeFixed(SOME_FIXED_VALUE)
                         .setSomeBytes(SOME_BYTES_VALUE)
                         .setSomeNull(null)
                         .setSomeEnum(MyEnum.ENUM1)
                         .setSomeRecord(SOME_RECORD_VALUE)
-                        .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE)
-                        .build();
+                        .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE);
+  }
+
+  public static SomeAvroDatum buildSomeAvroDatum()
+  {
+    return createSomeAvroDatumBuilderDefaults().build();
+  }
+
+  public static SomeAvroDatum buildSomeAvroDatumWithUnionValue(Object unionValue)
+  {
+    return createSomeAvroDatumBuilderDefaults()
+        .setSomeMultiMemberUnion(unionValue)
+        .build();
   }
 }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 0761672..e064e7a 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -19,11 +19,19 @@
 
 package org.apache.druid.data.input.avro;
 
+import org.apache.avro.util.Utf8;
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
 import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.data.input.UnionSubEnum;
+import org.apache.druid.data.input.UnionSubFixed;
+import org.apache.druid.data.input.UnionSubRecord;
 import org.junit.Assert;
 import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 public class AvroFlattenerMakerTest
@@ -33,7 +41,7 @@
   public void getRootField()
   {
     final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
-    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
 
     Assert.assertEquals(
         record.getTimestamp(),
@@ -120,7 +128,7 @@
   public void makeJsonPathExtractor()
   {
     final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
-    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
 
     Assert.assertEquals(
         record.getTimestamp(),
@@ -163,6 +171,10 @@
         flattener.makeJsonPathExtractor("$.someUnion").apply(record)
     );
     Assert.assertEquals(
+        record.getSomeMultiMemberUnion(),
+        flattener.makeJsonPathExtractor("$.someMultiMemberUnion.int").apply(record)
+    );
+    Assert.assertEquals(
         record.getSomeNull(),
         flattener.makeJsonPathExtractor("$.someNull").apply(record)
     );
@@ -219,11 +231,75 @@
     );
   }
 
+  @Test
+  public void jsonPathExtractorExtractUnionsByType()
+  {
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
+
+    // Unmamed types are accessed by type
+
+    // int
+    Assert.assertEquals(1, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.int").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1)));
+
+    // long
+    Assert.assertEquals(1L, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.long").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1L)));
+
+    // float
+    Assert.assertEquals((float) 1.0, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.float").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue((float) 1.0)));
+
+    // double
+    Assert.assertEquals(1.0, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.double").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1.0)));
+
+    // string
+    Assert.assertEquals("string", flattener.makeJsonPathExtractor("$.someMultiMemberUnion.string").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new Utf8("string"))));
+
+    // bytes
+    Assert.assertArrayEquals(new byte[] {1}, (byte[]) flattener.makeJsonPathExtractor("$.someMultiMemberUnion.bytes").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(ByteBuffer.wrap(new byte[] {1}))));
+
+    // map
+    Assert.assertEquals(2, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.map.two").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new HashMap<String, Integer>() {{
+            put("one", 1);
+            put("two", 2);
+            put("three", 3);
+          }
+        }
+        )));
+
+    // array
+    Assert.assertEquals(3, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.array[2]").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(Arrays.asList(1, 2, 3))));
+
+    // Named types are accessed by name
+
+    // record
+    Assert.assertEquals("subRecordString", flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubRecord.subString").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(
+            UnionSubRecord.newBuilder()
+                          .setSubString("subRecordString")
+                          .build())));
+
+    // fixed
+    final byte[] fixedBytes = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    Assert.assertEquals(fixedBytes, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubFixed").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new UnionSubFixed(fixedBytes))));
+
+    // enum
+    Assert.assertEquals(String.valueOf(UnionSubEnum.ENUM1), flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubEnum").apply(
+        AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(UnionSubEnum.ENUM1)));
+  }
+
   @Test(expected = UnsupportedOperationException.class)
   public void makeJsonQueryExtractor()
   {
     final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
-    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, false);
 
     Assert.assertEquals(
         record.getTimestamp(),
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index 4841483..b00c1e4 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -144,7 +144,7 @@
       Assert.assertFalse(iterator.hasNext());
       final Map<String, Object> rawColumns = row.getRawValues();
       Assert.assertNotNull(rawColumns);
-      Assert.assertEquals(19, rawColumns.size());
+      Assert.assertEquals(20, rawColumns.size());
       final List<InputRow> inputRows = row.getInputRows();
       Assert.assertNotNull(inputRows);
       final InputRow inputRow = Iterables.getOnlyElement(inputRows);
@@ -201,7 +201,7 @@
     final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
         "eventType")));
 
-    final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
+    final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null, null);
     final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
     final FileEntity entity = new FileEntity(someAvroFile);
     return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index c8e9a45..796891e 100755
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -51,6 +51,7 @@
 {
   private final ParseSpec parseSpec;
   private final boolean binaryAsString;
+  private final boolean extractUnionsByType;
   private final TimestampSpec timestampSpec;
   private final ObjectFlattener<GenericRecord> recordFlattener;
   private final List<String> dimensions;
@@ -58,13 +59,15 @@
   @JsonCreator
   public ParquetAvroHadoopInputRowParser(
       @JsonProperty("parseSpec") ParseSpec parseSpec,
-      @JsonProperty("binaryAsString") Boolean binaryAsString
+      @JsonProperty("binaryAsString") Boolean binaryAsString,
+      @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
   )
   {
     this.parseSpec = parseSpec;
     this.timestampSpec = parseSpec.getTimestampSpec();
     this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
-    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+    this.binaryAsString = binaryAsString != null && binaryAsString;
+    this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
 
     final JSONPathSpec flattenSpec;
     if (parseSpec instanceof AvroParseSpec) {
@@ -75,7 +78,7 @@
 
     this.recordFlattener = ObjectFlatteners.create(
         flattenSpec,
-        new AvroFlattenerMaker(false, this.binaryAsString)
+        new AvroFlattenerMaker(false, this.binaryAsString, this.extractUnionsByType)
     );
   }
 
@@ -131,6 +134,6 @@
   @Override
   public InputRowParser withParseSpec(ParseSpec parseSpec)
   {
-    return new ParquetAvroHadoopInputRowParser(parseSpec, binaryAsString);
+    return new ParquetAvroHadoopInputRowParser(parseSpec, binaryAsString, extractUnionsByType);
   }
 }
diff --git a/website/.spelling b/website/.spelling
index dd240c5..a0a1025 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -74,6 +74,7 @@
 EMRFS
 ETL
 Elasticsearch
+Enums
 FirehoseFactory
 FlattenSpec
 Float.NEGATIVE_INFINITY