Avro union support (#10505)
* Avro union support
* Document new union support
* Add support for AvroStreamInputFormat and fix checkstyle
* Extend multi-member union test schema and format
* Some additional docs and add Enums to spelling
* Rename explodeUnions -> extractUnions
* explode -> extract
* ByType
* Correct spelling error
diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md
index a200e83..75704cf 100644
--- a/docs/development/extensions-core/avro.md
+++ b/docs/development/extensions-core/avro.md
@@ -39,13 +39,30 @@
Druid supports most Avro types natively, there are however some exceptions which are detailed here.
-`union` types which aren't of the form `[null, otherType]` aren't supported at this time.
+#### Unions
+Druid has two modes for supporting `union` types.
+The default mode will treat unions as a single value regardless of the type it is populated with.
+
+If you wish to operate on each different member of a union however you can set `extractUnionsByType` on the Avro parser in which case unions will be expanded into nested objects according to the following rules:
+* Primitive types and unnamed complex types are keyed their type name. i.e `int`, `string`
+* Complex named types are keyed by their names, this includes `record`, `fixed` and `enum`.
+* The Avro null type is elided as its value can only ever be null
+
+This is safe because an Avro union can only contain a single member of each unnamed type and duplicates of the same named type are not allowed.
+i.e only a single array is allowed, multiple records (or other named types) are allowed as long as each has a unique name.
+
+The members can then be accessed using a [flattenSpec](../../ingestion/data-formats.md#flattenspec) similar other nested types.
+
+#### Binary types
`bytes` and `fixed` Avro types will be returned by default as base64 encoded strings unless the `binaryAsString` option is enabled on the Avro parser.
This setting will decode these types as UTF-8 strings.
+#### Enums
`enum` types will be returned as `string` of the enum symbol.
+#### Complex types
`record` and `map` types representing nested data can be ingested using [flattenSpec](../../ingestion/data-formats.md#flattenspec) on the parser.
-Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
\ No newline at end of file
+#### Logical types
+Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 39ce48e..165f6a4 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -34,18 +34,24 @@
{
private final ParseSpec parseSpec;
private final boolean fromPigAvroStorage;
+ private final boolean binaryAsString;
+ private final boolean extractUnionsByType;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;
@JsonCreator
public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
- @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
+ @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage,
+ @JsonProperty("binaryAsString") Boolean binaryAsString,
+ @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
)
{
this.parseSpec = parseSpec;
- this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
- this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
+ this.fromPigAvroStorage = fromPigAvroStorage != null && fromPigAvroStorage;
+ this.binaryAsString = binaryAsString != null && binaryAsString;
+ this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
+ this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, this.binaryAsString, this.extractUnionsByType);
this.mapParser = new MapInputRowParser(parseSpec);
}
@@ -71,6 +77,6 @@
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
- return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
+ return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
}
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index f375c63..fcb8bd8 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -36,6 +36,8 @@
public class AvroStreamInputRowParser implements ByteBufferInputRowParser
{
private final ParseSpec parseSpec;
+ private final Boolean binaryAsString;
+ private final Boolean extractUnionsByType;
private final AvroBytesDecoder avroBytesDecoder;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;
@@ -43,12 +45,16 @@
@JsonCreator
public AvroStreamInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
- @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder
+ @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+ @JsonProperty("binaryAsString") Boolean binaryAsString,
+ @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
)
{
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
- this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+ this.binaryAsString = binaryAsString != null && binaryAsString;
+ this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
+ this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, this.binaryAsString, this.extractUnionsByType);
this.mapParser = new MapInputRowParser(parseSpec);
}
@@ -76,7 +82,9 @@
{
return new AvroStreamInputRowParser(
parseSpec,
- avroBytesDecoder
+ avroBytesDecoder,
+ binaryAsString,
+ extractUnionsByType
);
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 8f64532..8526a04 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -44,13 +44,8 @@
public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{
- private static final JsonProvider AVRO_JSON_PROVIDER = new GenericAvroJsonProvider();
- private static final Configuration JSONPATH_CONFIGURATION =
- Configuration.builder()
- .jsonProvider(AVRO_JSON_PROVIDER)
- .mappingProvider(new NotImplementedMappingProvider())
- .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
- .build();
+ private final JsonProvider avroJsonProvider;
+ private final Configuration jsonPathConfiguration;
private static final EnumSet<Schema.Type> ROOT_TYPES = EnumSet.of(
Schema.Type.STRING,
@@ -99,10 +94,18 @@
* @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
* @param binaryAsString boolean to encode the byte[] as a string.
*/
- public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
+ public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString, final boolean extractUnionsByType)
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
+
+ this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
+ this.jsonPathConfiguration =
+ Configuration.builder()
+ .jsonProvider(avroJsonProvider)
+ .mappingProvider(new NotImplementedMappingProvider())
+ .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
+ .build();
}
@Override
@@ -126,7 +129,7 @@
public Function<GenericRecord, Object> makeJsonPathExtractor(final String expr)
{
final JsonPath jsonPath = JsonPath.compile(expr);
- return record -> transformValue(jsonPath.read(record, JSONPATH_CONFIGURATION));
+ return record -> transformValue(jsonPath.read(record, jsonPathConfiguration));
}
@Override
@@ -138,7 +141,7 @@
@Override
public JsonProvider getJsonProvider()
{
- return AVRO_JSON_PROVIDER;
+ return avroJsonProvider;
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
index 90c74f6..7417477 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
@@ -42,6 +42,7 @@
private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class);
private final boolean binaryAsString;
+ private final boolean extractUnionsByType;
@Nullable
private final Schema readerSchema;
@@ -50,7 +51,8 @@
@JacksonInject @Json ObjectMapper mapper,
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("schema") @Nullable Map<String, Object> schema,
- @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+ @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+ @JsonProperty("extractUnionsByType") @Nullable Boolean extractUnionsByType
) throws Exception
{
super(flattenSpec);
@@ -62,7 +64,8 @@
} else {
this.readerSchema = null;
}
- this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+ this.binaryAsString = binaryAsString != null && binaryAsString;
+ this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
}
@Override
@@ -82,7 +85,8 @@
temporaryDirectory,
readerSchema,
getFlattenSpec(),
- binaryAsString
+ binaryAsString,
+ extractUnionsByType
);
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 97ab895..dc2a6d7 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -56,14 +56,15 @@
File temporaryDirectory,
@Nullable Schema readerSchema,
JSONPathSpec flattenSpec,
- boolean binaryAsString
+ boolean binaryAsString,
+ boolean extractUnionsByType
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.readerSchema = readerSchema;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+ this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 9d5665e..6d399a8 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -39,7 +39,8 @@
public static ObjectFlattener<GenericRecord> makeFlattener(
final ParseSpec parseSpec,
final boolean fromPigAvroStorage,
- final boolean binaryAsString
+ final boolean binaryAsString,
+ final boolean extractUnionsByType
)
{
final JSONPathSpec flattenSpec;
@@ -49,7 +50,7 @@
flattenSpec = JSONPathSpec.DEFAULT;
}
- return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
+ return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType));
}
public static List<InputRow> parseGenericRecord(
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
index 3b59b36..d42f359 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
@@ -35,6 +35,7 @@
public class AvroStreamInputFormat extends NestedInputFormat
{
private final boolean binaryAsString;
+ private final boolean extractUnionsByType;
private final AvroBytesDecoder avroBytesDecoder;
@@ -42,12 +43,14 @@
public AvroStreamInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
- @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+ @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+ @JsonProperty("extractUnionsByType") @Nullable Boolean extractUnionsByType
)
{
super(flattenSpec);
this.avroBytesDecoder = avroBytesDecoder;
- this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+ this.binaryAsString = binaryAsString != null && binaryAsString;
+ this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
}
@Override
@@ -76,7 +79,8 @@
source,
avroBytesDecoder,
getFlattenSpec(),
- binaryAsString
+ binaryAsString,
+ extractUnionsByType
);
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index d8d92eb..c04ec14 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -52,13 +52,14 @@
InputEntity source,
AvroBytesDecoder avroBytesDecoder,
JSONPathSpec flattenSpec,
- boolean binaryAsString
+ boolean binaryAsString,
+ boolean extractUnionsByType
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.avroBytesDecoder = avroBytesDecoder;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+ this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index ab6a53e..86bbb7d 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -19,15 +19,19 @@
package org.apache.druid.data.input.avro;
+import com.google.common.collect.ImmutableMap;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import javax.annotation.Nullable;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +45,13 @@
*/
public class GenericAvroJsonProvider implements JsonProvider
{
+ private final boolean extractUnionsByType;
+
+ GenericAvroJsonProvider(final boolean extractUnionsByType)
+ {
+ this.extractUnionsByType = extractUnionsByType;
+ }
+
@Override
public Object parse(final String s) throws InvalidJsonException
{
@@ -148,7 +159,11 @@
if (o == null) {
return null;
} else if (o instanceof GenericRecord) {
- return ((GenericRecord) o).get(s);
+ final GenericRecord record = (GenericRecord) o;
+ if (extractUnionsByType && isExtractableUnion(record.getSchema().getField(s))) {
+ return extractUnionTypes(record.get(s));
+ }
+ return record.get(s);
} else if (o instanceof Map) {
final Map theMap = (Map) o;
if (theMap.containsKey(s)) {
@@ -195,4 +210,46 @@
{
return o;
}
+
+ private boolean isExtractableUnion(final Schema.Field field)
+ {
+ return field.schema().isUnion() &&
+ field.schema().getTypes().stream().filter(type -> type.getType() != Schema.Type.NULL).count() > 1;
+ }
+
+ private Map<String, Object> extractUnionTypes(final Object o)
+ {
+ // Primitive types and unnamped complex types are keyed their type name.
+ // Complex named types are keyed by their names.
+ // This is safe because an Avro union can only contain a single member of each unnamed type and duplicates
+ // of the same named type are not allowed. i.e only a single array is allowed, multiple records are allowed as
+ // long as each has a unique name.
+ // The Avro null type is elided as it's value can only ever be null
+ if (o instanceof Integer) {
+ return ImmutableMap.of("int", o);
+ } else if (o instanceof Long) {
+ return ImmutableMap.of("long", o);
+ } else if (o instanceof Float) {
+ return ImmutableMap.of("float", o);
+ } else if (o instanceof Double) {
+ return ImmutableMap.of("double", o);
+ } else if (o instanceof Boolean) {
+ return ImmutableMap.of("boolean", o);
+ } else if (o instanceof Utf8) {
+ return ImmutableMap.of("string", o);
+ } else if (o instanceof ByteBuffer) {
+ return ImmutableMap.of("bytes", o);
+ } else if (o instanceof Map) {
+ return ImmutableMap.of("map", o);
+ } else if (o instanceof List) {
+ return ImmutableMap.of("array", o);
+ } else if (o instanceof GenericRecord) {
+ return ImmutableMap.of(((GenericRecord) o).getSchema().getName(), o);
+ } else if (o instanceof GenericFixed) {
+ return ImmutableMap.of(((GenericFixed) o).getSchema().getName(), o);
+ } else if (o instanceof GenericEnumSymbol) {
+ return ImmutableMap.of(((GenericEnumSymbol<?>) o).getSchema().getName(), o);
+ }
+ return ImmutableMap.of();
+ }
}
diff --git a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
index bf8ea68..f03cb70 100644
--- a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
+++ b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc
@@ -1,38 +1,181 @@
-[{
- "namespace": "org.apache.druid.data.input",
- "name": "SomeAvroDatum",
- "type": "record",
- "fields" : [
- {"name":"timestamp","type":"long"},
- {"name":"eventType","type":"string"},
- {"name":"id","type":"long"},
- {"name":"someOtherId","type":"long"},
- {"name":"isValid","type":"boolean"},
- {"name":"someIntArray","type":{"type":"array","items":"int"}},
- {"name":"someStringArray","type":{"type":"array","items":"string"}},
- {"name":"someIntValueMap","type":{"type":"map","values":"int"}},
- {"name":"someStringValueMap","type":{"type":"map","values":"string"}},
- {"name":"someUnion","type":["null","string"]},
- {"name":"someNull","type":"null"},
- {"name":"someFixed","type":{"type":"fixed","size":16,"name":"MyFixed"}},
- {"name":"someBytes","type":"bytes"},
- {"name":"someEnum","type":{"type":"enum","name":"MyEnum","symbols":["ENUM0","ENUM1","ENUM2"]}},
- {"name":"someRecord","type":{
- "type":"record","name":"MySubRecord","fields":[
- {"name":"subInt","type":"int"},
- {"name":"subLong","type":"long"}
- ]
- }},
-
- {"name":"someLong","type":"long"},
- {"name":"someInt","type":"int"},
- {"name":"someFloat","type":"float"},
- {"name":"someRecordArray","type":{
- "type":"array","items":{
- "type":"record","name":"MyNestedRecord","fields":[
- {"name":"nestedString","type":"string"}
+[
+ {
+ "namespace": "org.apache.druid.data.input",
+ "name": "SomeAvroDatum",
+ "type": "record",
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": "long"
+ },
+ {
+ "name": "eventType",
+ "type": "string"
+ },
+ {
+ "name": "id",
+ "type": "long"
+ },
+ {
+ "name": "someOtherId",
+ "type": "long"
+ },
+ {
+ "name": "isValid",
+ "type": "boolean"
+ },
+ {
+ "name": "someIntArray",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "someStringArray",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "someIntValueMap",
+ "type": {
+ "type": "map",
+ "values": "int"
+ }
+ },
+ {
+ "name": "someStringValueMap",
+ "type": {
+ "type": "map",
+ "values": "string"
+ }
+ },
+ {
+ "name": "someUnion",
+ "type": [
+ "null",
+ "string"
]
- }
- }}
- ]
-}]
+ },
+ {
+ "name": "someMultiMemberUnion",
+ "type": [
+ "int",
+ "long",
+ "float",
+ "double",
+ "boolean",
+ "string",
+ "bytes",
+ {
+ "type": "map",
+ "values": "string"
+ },
+ {
+ "type": "array",
+ "items": "int"
+ },
+ {
+ "type": "record",
+ "name": "UnionSubRecord",
+ "fields": [
+ {
+ "name": "subString",
+ "type": "string"
+ }
+ ]
+ },
+ {
+ "name": "UnionSubFixed",
+ "type": "fixed",
+ "size": 10
+ },
+ {
+ "type": "enum",
+ "name": "UnionSubEnum",
+ "symbols": [
+ "ENUM0",
+ "ENUM1",
+ "ENUM2"
+ ]
+ }
+ ]
+ },
+ {
+ "name": "someNull",
+ "type": "null"
+ },
+ {
+ "name": "someFixed",
+ "type": {
+ "type": "fixed",
+ "size": 16,
+ "name": "MyFixed"
+ }
+ },
+ {
+ "name": "someBytes",
+ "type": "bytes"
+ },
+ {
+ "name": "someEnum",
+ "type": {
+ "type": "enum",
+ "name": "MyEnum",
+ "symbols": [
+ "ENUM0",
+ "ENUM1",
+ "ENUM2"
+ ]
+ }
+ },
+ {
+ "name": "someRecord",
+ "type": {
+ "type": "record",
+ "name": "MySubRecord",
+ "fields": [
+ {
+ "name": "subInt",
+ "type": "int"
+ },
+ {
+ "name": "subLong",
+ "type": "long"
+ }
+ ]
+ }
+ },
+ {
+ "name": "someLong",
+ "type": "long"
+ },
+ {
+ "name": "someInt",
+ "type": "int"
+ },
+ {
+ "name": "someFloat",
+ "type": "float"
+ },
+ {
+ "name": "someRecordArray",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "MyNestedRecord",
+ "fields": [
+ {
+ "name": "nestedString",
+ "type": "string"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+]
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index 6e3cdb0..c6ade85 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -62,7 +62,7 @@
private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
{
- AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage);
+ AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage, false, false);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 139616b..918aa0a 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -118,6 +118,7 @@
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
false
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -134,6 +135,7 @@
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
+ false,
false
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -150,6 +152,7 @@
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
false
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
@@ -193,6 +196,7 @@
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
false
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 0dbb6b9..8ed9f44 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -151,6 +151,7 @@
}
);
private static final String SOME_UNION_VALUE = "string as union";
+ private static final Integer SOME_UNION_MEMBER_VALUE = 1;
private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
private static final String SOME_RECORD_STRING_VALUE = "string in record";
private static final List<MyNestedRecord> SOME_RECORD_ARRAY_VALUE = Collections.singletonList(MyNestedRecord.newBuilder()
@@ -176,7 +177,9 @@
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
- new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
+ false
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -193,7 +196,9 @@
Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
- new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
+ false
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -234,7 +239,9 @@
Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC_SCHEMALESS,
- new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ false,
+ false
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -354,7 +361,7 @@
Assert.assertEquals(SOME_INT_VALUE, inputRow.getMetric("someInt"));
}
- public static SomeAvroDatum buildSomeAvroDatum()
+ private static SomeAvroDatum.Builder createSomeAvroDatumBuilderDefaults()
{
return SomeAvroDatum.newBuilder()
.setTimestamp(DATE_TIME.toInstant().toEpochMilli())
@@ -370,12 +377,24 @@
.setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE)
.setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE)
.setSomeUnion(SOME_UNION_VALUE)
+ .setSomeMultiMemberUnion(SOME_UNION_MEMBER_VALUE)
.setSomeFixed(SOME_FIXED_VALUE)
.setSomeBytes(SOME_BYTES_VALUE)
.setSomeNull(null)
.setSomeEnum(MyEnum.ENUM1)
.setSomeRecord(SOME_RECORD_VALUE)
- .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE)
- .build();
+ .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE);
+ }
+
+ public static SomeAvroDatum buildSomeAvroDatum()
+ {
+ return createSomeAvroDatumBuilderDefaults().build();
+ }
+
+ public static SomeAvroDatum buildSomeAvroDatumWithUnionValue(Object unionValue)
+ {
+ return createSomeAvroDatumBuilderDefaults()
+ .setSomeMultiMemberUnion(unionValue)
+ .build();
}
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 0761672..e064e7a 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -19,11 +19,19 @@
package org.apache.druid.data.input.avro;
+import org.apache.avro.util.Utf8;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.data.input.UnionSubEnum;
+import org.apache.druid.data.input.UnionSubFixed;
+import org.apache.druid.data.input.UnionSubRecord;
import org.junit.Assert;
import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
public class AvroFlattenerMakerTest
@@ -33,7 +41,7 @@
public void getRootField()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
- final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
Assert.assertEquals(
record.getTimestamp(),
@@ -120,7 +128,7 @@
public void makeJsonPathExtractor()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
- final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
Assert.assertEquals(
record.getTimestamp(),
@@ -163,6 +171,10 @@
flattener.makeJsonPathExtractor("$.someUnion").apply(record)
);
Assert.assertEquals(
+ record.getSomeMultiMemberUnion(),
+ flattener.makeJsonPathExtractor("$.someMultiMemberUnion.int").apply(record)
+ );
+ Assert.assertEquals(
record.getSomeNull(),
flattener.makeJsonPathExtractor("$.someNull").apply(record)
);
@@ -219,11 +231,75 @@
);
}
+ @Test
+ public void jsonPathExtractorExtractUnionsByType()
+ {
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
+
+ // Unmamed types are accessed by type
+
+ // int
+ Assert.assertEquals(1, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.int").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1)));
+
+ // long
+ Assert.assertEquals(1L, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.long").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1L)));
+
+ // float
+ Assert.assertEquals((float) 1.0, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.float").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue((float) 1.0)));
+
+ // double
+ Assert.assertEquals(1.0, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.double").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(1.0)));
+
+ // string
+ Assert.assertEquals("string", flattener.makeJsonPathExtractor("$.someMultiMemberUnion.string").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new Utf8("string"))));
+
+ // bytes
+ Assert.assertArrayEquals(new byte[] {1}, (byte[]) flattener.makeJsonPathExtractor("$.someMultiMemberUnion.bytes").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(ByteBuffer.wrap(new byte[] {1}))));
+
+ // map
+ Assert.assertEquals(2, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.map.two").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new HashMap<String, Integer>() {{
+ put("one", 1);
+ put("two", 2);
+ put("three", 3);
+ }
+ }
+ )));
+
+ // array
+ Assert.assertEquals(3, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.array[2]").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(Arrays.asList(1, 2, 3))));
+
+ // Named types are accessed by name
+
+ // record
+ Assert.assertEquals("subRecordString", flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubRecord.subString").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(
+ UnionSubRecord.newBuilder()
+ .setSubString("subRecordString")
+ .build())));
+
+ // fixed
+ final byte[] fixedBytes = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ Assert.assertEquals(fixedBytes, flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubFixed").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(new UnionSubFixed(fixedBytes))));
+
+ // enum
+ Assert.assertEquals(String.valueOf(UnionSubEnum.ENUM1), flattener.makeJsonPathExtractor("$.someMultiMemberUnion.UnionSubEnum").apply(
+ AvroStreamInputRowParserTest.buildSomeAvroDatumWithUnionValue(UnionSubEnum.ENUM1)));
+ }
+
@Test(expected = UnsupportedOperationException.class)
public void makeJsonQueryExtractor()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
- final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false);
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, false);
Assert.assertEquals(
record.getTimestamp(),
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index 4841483..b00c1e4 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -144,7 +144,7 @@
Assert.assertFalse(iterator.hasNext());
final Map<String, Object> rawColumns = row.getRawValues();
Assert.assertNotNull(rawColumns);
- Assert.assertEquals(19, rawColumns.size());
+ Assert.assertEquals(20, rawColumns.size());
final List<InputRow> inputRows = row.getInputRows();
Assert.assertNotNull(inputRows);
final InputRow inputRow = Iterables.getOnlyElement(inputRows);
@@ -201,7 +201,7 @@
final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"eventType")));
- final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
+ final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null, null);
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(someAvroFile);
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index c8e9a45..796891e 100755
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -51,6 +51,7 @@
{
private final ParseSpec parseSpec;
private final boolean binaryAsString;
+ private final boolean extractUnionsByType;
private final TimestampSpec timestampSpec;
private final ObjectFlattener<GenericRecord> recordFlattener;
private final List<String> dimensions;
@@ -58,13 +59,15 @@
@JsonCreator
public ParquetAvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
- @JsonProperty("binaryAsString") Boolean binaryAsString
+ @JsonProperty("binaryAsString") Boolean binaryAsString,
+ @JsonProperty("extractUnionsByType") Boolean extractUnionsByType
)
{
this.parseSpec = parseSpec;
this.timestampSpec = parseSpec.getTimestampSpec();
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
- this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+ this.binaryAsString = binaryAsString != null && binaryAsString;
+ this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
final JSONPathSpec flattenSpec;
if (parseSpec instanceof AvroParseSpec) {
@@ -75,7 +78,7 @@
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
- new AvroFlattenerMaker(false, this.binaryAsString)
+ new AvroFlattenerMaker(false, this.binaryAsString, this.extractUnionsByType)
);
}
@@ -131,6 +134,6 @@
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
- return new ParquetAvroHadoopInputRowParser(parseSpec, binaryAsString);
+ return new ParquetAvroHadoopInputRowParser(parseSpec, binaryAsString, extractUnionsByType);
}
}
diff --git a/website/.spelling b/website/.spelling
index dd240c5..a0a1025 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -74,6 +74,7 @@
EMRFS
ETL
Elasticsearch
+Enums
FirehoseFactory
FlattenSpec
Float.NEGATIVE_INFINITY