Fix Avro support in Web Console (#10232)

* Fix Avro OCF detection prefix and run formation detection on raw input

* Support Avro Fixed and Enum types correctly

* Check Avro version byte in format detection

* Add test for AvroOCFReader.sample

Ensures that the Sampler doesn't receive raw input that it can't
serialize into JSON.

* Document Avro type handling

* Add TS unit tests for guessInputFormat
diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md
index 3ec4c70..a200e83 100644
--- a/docs/development/extensions-core/avro.md
+++ b/docs/development/extensions-core/avro.md
@@ -29,4 +29,23 @@
 See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser)
 for more details about how to use these in an ingestion spec.
 
-Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
\ No newline at end of file
+Additionally, it provides an InputFormat for reading Avro OCF files when using
+[native batch indexing](../../ingestion/native-batch.md), see [Avro OCF](../../ingestion/data-formats.md#avro-ocf)
+for details on how to ingest OCF files.
+
+Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
+
+### Avro Types
+
+Druid supports most Avro types natively, there are however some exceptions which are detailed here.
+
+`union` types which aren't of the form `[null, otherType]` aren't supported at this time.
+
+`bytes` and `fixed` Avro types will be returned by default as base64 encoded strings unless the `binaryAsString` option is enabled on the Avro parser.
+This setting will decode these types as UTF-8 strings.
+
+`enum` types will be returned as `string` of the enum symbol.
+
+`record` and `map` types representing nested data can be ingested using [flattenSpec](../../ingestion/data-formats.md#flattenspec) on the parser.
+
+Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
\ No newline at end of file
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 45562f4..21adee1 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -227,6 +227,8 @@
 
 > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
 
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
 The `inputFormat` to load data of Avro OCF format. An example is:
 ```json
 "ioConfig": {
@@ -343,6 +345,8 @@
 
 > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Hadoop Parser.
 
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
 This parser is for [Hadoop batch ingestion](./hadoop.md).
 The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`.
 You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`,
@@ -865,6 +869,8 @@
 
 > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream Parser.
 
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
 This parser is for [stream ingestion](./index.md#streaming) and reads Avro data from a stream directly.
 
 | Field | Type | Description | Required |
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index e5c0a04..8f64532 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -25,7 +25,9 @@
 import com.jayway.jsonpath.Option;
 import com.jayway.jsonpath.spi.json.JsonProvider;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.druid.java.util.common.StringUtils;
@@ -56,7 +58,9 @@
       Schema.Type.INT,
       Schema.Type.LONG,
       Schema.Type.FLOAT,
-      Schema.Type.DOUBLE
+      Schema.Type.DOUBLE,
+      Schema.Type.ENUM,
+      Schema.Type.FIXED
   );
 
   private static boolean isPrimitive(Schema schema)
@@ -137,9 +141,15 @@
     return AVRO_JSON_PROVIDER;
   }
 
+  @Override
+  public Object finalizeConversionForMap(Object o)
+  {
+    return transformValue(o);
+  }
+
   private Object transformValue(final Object field)
   {
-    if (fromPigAvroStorage && field instanceof GenericData.Array) {
+    if (fromPigAvroStorage && field instanceof GenericArray) {
       return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
     }
     if (field instanceof ByteBuffer) {
@@ -152,6 +162,14 @@
       return field.toString();
     } else if (field instanceof List) {
       return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
+    } else if (field instanceof GenericEnumSymbol) {
+      return field.toString();
+    } else if (field instanceof GenericFixed) {
+      if (binaryAsString) {
+        return StringUtils.fromUtf8(((GenericFixed) field).bytes());
+      } else {
+        return ((GenericFixed) field).bytes();
+      }
     }
     return field;
   }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 4572ec1..0dbb6b9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -19,10 +19,8 @@
 
 package org.apache.druid.data.input;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.common.base.Function;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
@@ -40,6 +38,7 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
 import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -90,10 +89,12 @@
       "someStringArray",
       "someIntArray",
       "someFloat",
-      "someUnion",
       EVENT_TYPE,
-      ID,
+      "someFixed",
       "someBytes",
+      "someUnion",
+      ID,
+      "someEnum",
       "someLong",
       "someInt",
       "timestamp"
@@ -158,14 +159,12 @@
                                                                                                               .build());
   private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
 
-  private final ObjectMapper jsonMapper = new ObjectMapper();
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
 
 
   @Before
   public void before()
   {
-    jsonMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
-    jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
       jsonMapper.registerModule(jacksonModule);
     }
@@ -335,7 +334,10 @@
     );
     Assert.assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
     Assert.assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
-    Assert.assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
+    Assert.assertEquals(
+        Arrays.toString(SOME_FIXED_VALUE.bytes()),
+        Arrays.toString((byte[]) (inputRow.getRaw("someFixed")))
+    );
     Assert.assertEquals(
         Arrays.toString(SOME_BYTES_VALUE.array()),
         Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 6becdf7..0761672 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -80,7 +80,8 @@
         flattener.getRootField(record, "someNull")
     );
     Assert.assertEquals(
-        record.getSomeFixed(),
+        // Casted to an array by transformValue
+        record.getSomeFixed().bytes(),
         flattener.getRootField(record, "someFixed")
     );
     Assert.assertEquals(
@@ -89,7 +90,8 @@
         flattener.getRootField(record, "someBytes")
     );
     Assert.assertEquals(
-        record.getSomeEnum(),
+        // Casted to a string by transformValue
+        record.getSomeEnum().toString(),
         flattener.getRootField(record, "someEnum")
     );
     Assert.assertEquals(
@@ -165,7 +167,8 @@
         flattener.makeJsonPathExtractor("$.someNull").apply(record)
     );
     Assert.assertEquals(
-        record.getSomeFixed(),
+        // Casted to an array by transformValue
+        record.getSomeFixed().bytes(),
         flattener.makeJsonPathExtractor("$.someFixed").apply(record)
     );
     Assert.assertEquals(
@@ -174,7 +177,8 @@
         flattener.makeJsonPathExtractor("$.someBytes").apply(record)
     );
     Assert.assertEquals(
-        record.getSomeEnum(),
+        // Casted to a string by transformValue
+        record.getSomeEnum().toString(),
         flattener.makeJsonPathExtractor("$.someEnum").apply(record)
     );
     Assert.assertEquals(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index fe6070b..da586d8 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -29,6 +29,7 @@
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.FileEntity;
@@ -128,18 +129,66 @@
     }
   }
 
+  @Test
+  public void testSample() throws Exception
+  {
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
+    );
+    final InputEntityReader reader = createReader(mapper, null);
+    try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
+      Assert.assertTrue(iterator.hasNext());
+      final InputRowListPlusRawValues row = iterator.next();
+      Assert.assertFalse(iterator.hasNext());
+      final Map<String, Object> rawColumns = row.getRawValues();
+      Assert.assertNotNull(rawColumns);
+      Assert.assertEquals(19, rawColumns.size());
+      final List<InputRow> inputRows = row.getInputRows();
+      Assert.assertNotNull(inputRows);
+      final InputRow inputRow = Iterables.getOnlyElement(inputRows);
+      assertInputRow(inputRow);
+    }
+  }
+
+  @Test
+  public void testSampleSerdeRaw() throws Exception
+  {
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
+    );
+    final InputEntityReader reader = createReader(mapper, null);
+    try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
+      Assert.assertTrue(iterator.hasNext());
+      final InputRowListPlusRawValues row = iterator.next();
+      Assert.assertFalse(iterator.hasNext());
+      final List<InputRow> inputRows = row.getInputRows();
+      Assert.assertNotNull(inputRows);
+      final InputRow inputRow = Iterables.getOnlyElement(inputRows);
+      assertInputRow(inputRow);
+      // Ensure the raw values can be serialised into JSON
+      mapper.writeValueAsString(row.getRawValues());
+    }
+  }
+
   private void assertRow(InputEntityReader reader) throws IOException
   {
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       Assert.assertTrue(iterator.hasNext());
       final InputRow row = iterator.next();
-      Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
-      Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
-      Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
+      assertInputRow(row);
       Assert.assertFalse(iterator.hasNext());
     }
   }
 
+  private void assertInputRow(InputRow row)
+  {
+    Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
+    Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
+    Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
+  }
+
   private InputEntityReader createReader(
       ObjectMapper mapper,
       Map<String, Object> readerSchema
diff --git a/web-console/src/utils/ingestion-spec.spec.ts b/web-console/src/utils/ingestion-spec.spec.ts
index 19d3911..3577819 100644
--- a/web-console/src/utils/ingestion-spec.spec.ts
+++ b/web-console/src/utils/ingestion-spec.spec.ts
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import { cleanSpec, downgradeSpec, upgradeSpec } from './ingestion-spec';
+import { cleanSpec, downgradeSpec, guessInputFormat, upgradeSpec } from './ingestion-spec';
 
 describe('ingestion-spec', () => {
   const oldSpec = {
@@ -120,4 +120,35 @@
       },
     });
   });
+
+  describe('guessInputFormat', () => {
+    it('works for parquet', () => {
+      expect(guessInputFormat(['PAR1lol']).type).toEqual('parquet');
+    });
+
+    it('works for orc', () => {
+      expect(guessInputFormat(['ORClol']).type).toEqual('orc');
+    });
+
+    it('works for AVRO', () => {
+      expect(guessInputFormat(['Obj\x01lol']).type).toEqual('avro_ocf');
+      expect(guessInputFormat(['Obj1lol']).type).toEqual('regex');
+    });
+
+    it('works for JSON', () => {
+      expect(guessInputFormat(['{"a":1}']).type).toEqual('json');
+    });
+
+    it('works for TSV', () => {
+      expect(guessInputFormat(['A\tB\tX\tY']).type).toEqual('tsv');
+    });
+
+    it('works for CSV', () => {
+      expect(guessInputFormat(['A,B,X,Y']).type).toEqual('csv');
+    });
+
+    it('works for regex', () => {
+      expect(guessInputFormat(['A|B|X|Y']).type).toEqual('regex');
+    });
+  });
 });
diff --git a/web-console/src/utils/ingestion-spec.tsx b/web-console/src/utils/ingestion-spec.tsx
index a4cdbb8..996f8e0 100644
--- a/web-console/src/utils/ingestion-spec.tsx
+++ b/web-console/src/utils/ingestion-spec.tsx
@@ -2656,7 +2656,7 @@
   return deepSet(spec, 'spec.ioConfig.inputFormat', guessInputFormat(sampleData));
 }
 
-function guessInputFormat(sampleData: string[]): InputFormat {
+export function guessInputFormat(sampleData: string[]): InputFormat {
   let sampleDatum = sampleData[0];
   if (sampleDatum) {
     sampleDatum = String(sampleDatum); // Really ensure it is a string
@@ -2672,7 +2672,7 @@
       return inputFormatFromType('orc');
     }
     // Avro OCF 4 byte magic header: https://avro.apache.org/docs/current/spec.html#Object+Container+Files
-    if (sampleDatum.startsWith('Obj1')) {
+    if (sampleDatum.startsWith('Obj') && sampleDatum.charCodeAt(3) === 1) {
       return inputFormatFromType('avro_ocf');
     }
 
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx
index 49b814e..af6056d 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -1235,9 +1235,7 @@
                 fillDataSourceNameIfNeeded(
                   fillInputFormat(
                     spec,
-                    filterMap(inputQueryState.data.data, l =>
-                      l.parsed ? l.parsed.raw : undefined,
-                    ),
+                    filterMap(inputQueryState.data.data, l => (l.input ? l.input.raw : undefined)),
                   ),
                 ),
               );
diff --git a/website/.spelling b/website/.spelling
index a783d32..5a4453e 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -613,6 +613,7 @@
 SchemaRepo
 avro
 avroBytesDecoder
+flattenSpec
 jq
 org.apache.druid.extensions
 schemaRepository