Fix Avro support in Web Console (#10232)
* Fix Avro OCF detection prefix and run formation detection on raw input
* Support Avro Fixed and Enum types correctly
* Check Avro version byte in format detection
* Add test for AvroOCFReader.sample
Ensures that the Sampler doesn't receive raw input that it can't
serialize into JSON.
* Document Avro type handling
* Add TS unit tests for guessInputFormat
diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md
index 3ec4c70..a200e83 100644
--- a/docs/development/extensions-core/avro.md
+++ b/docs/development/extensions-core/avro.md
@@ -29,4 +29,23 @@
See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser)
for more details about how to use these in an ingestion spec.
-Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
\ No newline at end of file
+Additionally, it provides an InputFormat for reading Avro OCF files when using
+[native batch indexing](../../ingestion/native-batch.md), see [Avro OCF](../../ingestion/data-formats.md#avro-ocf)
+for details on how to ingest OCF files.
+
+Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
+
+### Avro Types
+
+Druid supports most Avro types natively, there are however some exceptions which are detailed here.
+
+`union` types which aren't of the form `[null, otherType]` aren't supported at this time.
+
+`bytes` and `fixed` Avro types will be returned by default as base64 encoded strings unless the `binaryAsString` option is enabled on the Avro parser.
+This setting will decode these types as UTF-8 strings.
+
+`enum` types will be returned as `string` of the enum symbol.
+
+`record` and `map` types representing nested data can be ingested using [flattenSpec](../../ingestion/data-formats.md#flattenspec) on the parser.
+
+Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
\ No newline at end of file
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 45562f4..21adee1 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -227,6 +227,8 @@
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
The `inputFormat` to load data of Avro OCF format. An example is:
```json
"ioConfig": {
@@ -343,6 +345,8 @@
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Hadoop Parser.
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
This parser is for [Hadoop batch ingestion](./hadoop.md).
The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`.
You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`,
@@ -865,6 +869,8 @@
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream Parser.
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
This parser is for [stream ingestion](./index.md#streaming) and reads Avro data from a stream directly.
| Field | Type | Description | Required |
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index e5c0a04..8f64532 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -25,7 +25,9 @@
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils;
@@ -56,7 +58,9 @@
Schema.Type.INT,
Schema.Type.LONG,
Schema.Type.FLOAT,
- Schema.Type.DOUBLE
+ Schema.Type.DOUBLE,
+ Schema.Type.ENUM,
+ Schema.Type.FIXED
);
private static boolean isPrimitive(Schema schema)
@@ -137,9 +141,15 @@
return AVRO_JSON_PROVIDER;
}
+ @Override
+ public Object finalizeConversionForMap(Object o)
+ {
+ return transformValue(o);
+ }
+
private Object transformValue(final Object field)
{
- if (fromPigAvroStorage && field instanceof GenericData.Array) {
+ if (fromPigAvroStorage && field instanceof GenericArray) {
return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
}
if (field instanceof ByteBuffer) {
@@ -152,6 +162,14 @@
return field.toString();
} else if (field instanceof List) {
return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
+ } else if (field instanceof GenericEnumSymbol) {
+ return field.toString();
+ } else if (field instanceof GenericFixed) {
+ if (binaryAsString) {
+ return StringUtils.fromUtf8(((GenericFixed) field).bytes());
+ } else {
+ return ((GenericFixed) field).bytes();
+ }
}
return field;
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 4572ec1..0dbb6b9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -19,10 +19,8 @@
package org.apache.druid.data.input;
-import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -40,6 +38,7 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -90,10 +89,12 @@
"someStringArray",
"someIntArray",
"someFloat",
- "someUnion",
EVENT_TYPE,
- ID,
+ "someFixed",
"someBytes",
+ "someUnion",
+ ID,
+ "someEnum",
"someLong",
"someInt",
"timestamp"
@@ -158,14 +159,12 @@
.build());
private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
- private final ObjectMapper jsonMapper = new ObjectMapper();
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Before
public void before()
{
- jsonMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
- jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
@@ -335,7 +334,10 @@
);
Assert.assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
Assert.assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
- Assert.assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
+ Assert.assertEquals(
+ Arrays.toString(SOME_FIXED_VALUE.bytes()),
+ Arrays.toString((byte[]) (inputRow.getRaw("someFixed")))
+ );
Assert.assertEquals(
Arrays.toString(SOME_BYTES_VALUE.array()),
Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 6becdf7..0761672 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -80,7 +80,8 @@
flattener.getRootField(record, "someNull")
);
Assert.assertEquals(
- record.getSomeFixed(),
+ // Casted to an array by transformValue
+ record.getSomeFixed().bytes(),
flattener.getRootField(record, "someFixed")
);
Assert.assertEquals(
@@ -89,7 +90,8 @@
flattener.getRootField(record, "someBytes")
);
Assert.assertEquals(
- record.getSomeEnum(),
+ // Casted to a string by transformValue
+ record.getSomeEnum().toString(),
flattener.getRootField(record, "someEnum")
);
Assert.assertEquals(
@@ -165,7 +167,8 @@
flattener.makeJsonPathExtractor("$.someNull").apply(record)
);
Assert.assertEquals(
- record.getSomeFixed(),
+ // Casted to an array by transformValue
+ record.getSomeFixed().bytes(),
flattener.makeJsonPathExtractor("$.someFixed").apply(record)
);
Assert.assertEquals(
@@ -174,7 +177,8 @@
flattener.makeJsonPathExtractor("$.someBytes").apply(record)
);
Assert.assertEquals(
- record.getSomeEnum(),
+ // Casted to a string by transformValue
+ record.getSomeEnum().toString(),
flattener.makeJsonPathExtractor("$.someEnum").apply(record)
);
Assert.assertEquals(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index fe6070b..da586d8 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -29,6 +29,7 @@
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FileEntity;
@@ -128,18 +129,66 @@
}
}
+ @Test
+ public void testSample() throws Exception
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
+ );
+ final InputEntityReader reader = createReader(mapper, null);
+ try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
+ Assert.assertTrue(iterator.hasNext());
+ final InputRowListPlusRawValues row = iterator.next();
+ Assert.assertFalse(iterator.hasNext());
+ final Map<String, Object> rawColumns = row.getRawValues();
+ Assert.assertNotNull(rawColumns);
+ Assert.assertEquals(19, rawColumns.size());
+ final List<InputRow> inputRows = row.getInputRows();
+ Assert.assertNotNull(inputRows);
+ final InputRow inputRow = Iterables.getOnlyElement(inputRows);
+ assertInputRow(inputRow);
+ }
+ }
+
+ @Test
+ public void testSampleSerdeRaw() throws Exception
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
+ );
+ final InputEntityReader reader = createReader(mapper, null);
+ try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
+ Assert.assertTrue(iterator.hasNext());
+ final InputRowListPlusRawValues row = iterator.next();
+ Assert.assertFalse(iterator.hasNext());
+ final List<InputRow> inputRows = row.getInputRows();
+ Assert.assertNotNull(inputRows);
+ final InputRow inputRow = Iterables.getOnlyElement(inputRows);
+ assertInputRow(inputRow);
+ // Ensure the raw values can be serialised into JSON
+ mapper.writeValueAsString(row.getRawValues());
+ }
+ }
+
private void assertRow(InputEntityReader reader) throws IOException
{
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
- Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
- Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
- Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
+ assertInputRow(row);
Assert.assertFalse(iterator.hasNext());
}
}
+ private void assertInputRow(InputRow row)
+ {
+ Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
+ Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
+ Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
+ }
+
private InputEntityReader createReader(
ObjectMapper mapper,
Map<String, Object> readerSchema
diff --git a/web-console/src/utils/ingestion-spec.spec.ts b/web-console/src/utils/ingestion-spec.spec.ts
index 19d3911..3577819 100644
--- a/web-console/src/utils/ingestion-spec.spec.ts
+++ b/web-console/src/utils/ingestion-spec.spec.ts
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-import { cleanSpec, downgradeSpec, upgradeSpec } from './ingestion-spec';
+import { cleanSpec, downgradeSpec, guessInputFormat, upgradeSpec } from './ingestion-spec';
describe('ingestion-spec', () => {
const oldSpec = {
@@ -120,4 +120,35 @@
},
});
});
+
+ describe('guessInputFormat', () => {
+ it('works for parquet', () => {
+ expect(guessInputFormat(['PAR1lol']).type).toEqual('parquet');
+ });
+
+ it('works for orc', () => {
+ expect(guessInputFormat(['ORClol']).type).toEqual('orc');
+ });
+
+ it('works for AVRO', () => {
+ expect(guessInputFormat(['Obj\x01lol']).type).toEqual('avro_ocf');
+ expect(guessInputFormat(['Obj1lol']).type).toEqual('regex');
+ });
+
+ it('works for JSON', () => {
+ expect(guessInputFormat(['{"a":1}']).type).toEqual('json');
+ });
+
+ it('works for TSV', () => {
+ expect(guessInputFormat(['A\tB\tX\tY']).type).toEqual('tsv');
+ });
+
+ it('works for CSV', () => {
+ expect(guessInputFormat(['A,B,X,Y']).type).toEqual('csv');
+ });
+
+ it('works for regex', () => {
+ expect(guessInputFormat(['A|B|X|Y']).type).toEqual('regex');
+ });
+ });
});
diff --git a/web-console/src/utils/ingestion-spec.tsx b/web-console/src/utils/ingestion-spec.tsx
index a4cdbb8..996f8e0 100644
--- a/web-console/src/utils/ingestion-spec.tsx
+++ b/web-console/src/utils/ingestion-spec.tsx
@@ -2656,7 +2656,7 @@
return deepSet(spec, 'spec.ioConfig.inputFormat', guessInputFormat(sampleData));
}
-function guessInputFormat(sampleData: string[]): InputFormat {
+export function guessInputFormat(sampleData: string[]): InputFormat {
let sampleDatum = sampleData[0];
if (sampleDatum) {
sampleDatum = String(sampleDatum); // Really ensure it is a string
@@ -2672,7 +2672,7 @@
return inputFormatFromType('orc');
}
// Avro OCF 4 byte magic header: https://avro.apache.org/docs/current/spec.html#Object+Container+Files
- if (sampleDatum.startsWith('Obj1')) {
+ if (sampleDatum.startsWith('Obj') && sampleDatum.charCodeAt(3) === 1) {
return inputFormatFromType('avro_ocf');
}
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx
index 49b814e..af6056d 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -1235,9 +1235,7 @@
fillDataSourceNameIfNeeded(
fillInputFormat(
spec,
- filterMap(inputQueryState.data.data, l =>
- l.parsed ? l.parsed.raw : undefined,
- ),
+ filterMap(inputQueryState.data.data, l => (l.input ? l.input.raw : undefined)),
),
),
);
diff --git a/website/.spelling b/website/.spelling
index a783d32..5a4453e 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -613,6 +613,7 @@
SchemaRepo
avro
avroBytesDecoder
+flattenSpec
jq
org.apache.druid.extensions
schemaRepository