| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.data.input; |
| |
| import com.fasterxml.jackson.databind.Module; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Function; |
| import com.google.common.base.Splitter; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.io.EncoderFactory; |
| import org.apache.avro.specific.SpecificDatumWriter; |
| import org.apache.druid.data.input.avro.AvroExtensionsModule; |
| import org.apache.druid.data.input.avro.AvroParseSpec; |
| import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder; |
| import org.apache.druid.data.input.impl.DimensionsSpec; |
| import org.apache.druid.data.input.impl.TimestampSpec; |
| import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper; |
| import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; |
| import org.apache.druid.java.util.common.parsers.JSONPathFieldType; |
| import org.apache.druid.java.util.common.parsers.JSONPathSpec; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.schemarepo.InMemoryRepository; |
| import org.schemarepo.Repository; |
| import org.schemarepo.SchemaValidationException; |
| import org.schemarepo.api.TypedSchemaRepository; |
| import org.schemarepo.api.converter.AvroSchemaConverter; |
| import org.schemarepo.api.converter.IdentityConverter; |
| import org.schemarepo.api.converter.IntegerConverter; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.time.ZoneOffset; |
| import java.time.ZonedDateTime; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Pattern; |
| |
| public class AvroStreamInputRowParserTest |
| { |
| private static final String EVENT_TYPE = "eventType"; |
| private static final String ID = "id"; |
| private static final String SOME_OTHER_ID = "someOtherId"; |
| private static final String IS_VALID = "isValid"; |
| private static final String TOPIC = "aTopic"; |
| private static final String EVENT_TYPE_VALUE = "type-a"; |
| private static final long ID_VALUE = 1976491L; |
| private static final long SOME_OTHER_ID_VALUE = 6568719896L; |
| private static final float SOME_FLOAT_VALUE = 0.23555f; |
| private static final int SOME_INT_VALUE = 1; |
| private static final long SOME_LONG_VALUE = 679865987569912369L; |
| private static final ZonedDateTime DATE_TIME = ZonedDateTime.of(2015, 10, 25, 19, 30, 0, 0, ZoneOffset.UTC); |
| static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); |
| private static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList( |
| "nested", |
| SOME_OTHER_ID, |
| "someStringArray", |
| "someIntArray", |
| "someFloat", |
| EVENT_TYPE, |
| "someFixed", |
| "someBytes", |
| "someUnion", |
| ID, |
| "someEnum", |
| "someLong", |
| "someInt", |
| "timestamp" |
| ); |
| static final AvroParseSpec PARSE_SPEC = new AvroParseSpec( |
| new TimestampSpec("nested", "millis", null), |
| new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null), |
| new JSONPathSpec( |
| true, |
| ImmutableList.of( |
| new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong") |
| ) |
| ) |
| ); |
| private static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec( |
| new TimestampSpec("nested", "millis", null), |
| new DimensionsSpec(null, null, null), |
| new JSONPathSpec( |
| true, |
| ImmutableList.of( |
| new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong") |
| ) |
| ) |
| ); |
| private static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array()); |
| private static final long SUB_LONG_VALUE = 1543698L; |
| private static final int SUB_INT_VALUE = 4892; |
| private static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder() |
| .setSubInt(SUB_INT_VALUE) |
| .setSubLong(SUB_LONG_VALUE) |
| .build(); |
| private static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1"); |
| private static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8); |
| private static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap( |
| new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>() |
| { |
| @Nonnull |
| @Override |
| public Integer apply(@Nullable CharSequence input) |
| { |
| return Integer.parseInt(input.toString()); |
| } |
| } |
| ); |
| private static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap( |
| new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>() |
| { |
| @Nonnull |
| @Override |
| public CharSequence apply(@Nullable CharSequence input) |
| { |
| return input.toString(); |
| } |
| } |
| ); |
| private static final String SOME_UNION_VALUE = "string as union"; |
| private static final Integer SOME_UNION_MEMBER_VALUE = 1; |
| private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); |
| private static final String SOME_RECORD_STRING_VALUE = "string in record"; |
| private static final List<MyNestedRecord> SOME_RECORD_ARRAY_VALUE = Collections.singletonList(MyNestedRecord.newBuilder() |
| .setNestedString( |
| SOME_RECORD_STRING_VALUE) |
| .build()); |
| private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]"); |
| |
| private final ObjectMapper jsonMapper = new DefaultObjectMapper(); |
| |
| |
| @Before |
| public void before() |
| { |
| for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { |
| jsonMapper.registerModule(jacksonModule); |
| } |
| } |
| |
| @Test |
| public void testSerde() throws IOException |
| { |
| Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); |
| AvroStreamInputRowParser parser = new AvroStreamInputRowParser( |
| PARSE_SPEC, |
| new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), |
| false, |
| false |
| ); |
| ByteBufferInputRowParser parser2 = jsonMapper.readValue( |
| jsonMapper.writeValueAsString(parser), |
| ByteBufferInputRowParser.class |
| ); |
| |
| Assert.assertEquals(parser, parser2); |
| } |
| |
| @Test |
| public void testParse() throws SchemaValidationException, IOException |
| { |
| // serde test |
| Repository repository = new InMemoryRepository(null); |
| AvroStreamInputRowParser parser = new AvroStreamInputRowParser( |
| PARSE_SPEC, |
| new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), |
| false, |
| false |
| ); |
| ByteBufferInputRowParser parser2 = jsonMapper.readValue( |
| jsonMapper.writeValueAsString(parser), |
| ByteBufferInputRowParser.class |
| ); |
| repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputRowParser) parser2).getAvroBytesDecoder()).getSchemaRepository(); |
| |
| // prepare data |
| GenericRecord someAvroDatum = buildSomeAvroDatum(); |
| |
| // encode schema id |
| Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); |
| TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>( |
| repository, |
| new IntegerConverter(), |
| new AvroSchemaConverter(), |
| new IdentityConverter() |
| ); |
| Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); |
| ByteBuffer byteBuffer = ByteBuffer.allocate(4); |
| converter.putSubjectAndId(id, byteBuffer); |
| ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| out.write(byteBuffer.array()); |
| // encode data |
| DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); |
| // write avro datum to bytes |
| writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); |
| |
| InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); |
| |
| assertInputRowCorrect(inputRow, DIMENSIONS, false); |
| } |
| |
| @Test |
| public void testParseSchemaless() throws SchemaValidationException, IOException |
| { |
| // serde test |
| Repository repository = new InMemoryRepository(null); |
| AvroStreamInputRowParser parser = new AvroStreamInputRowParser( |
| PARSE_SPEC_SCHEMALESS, |
| new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), |
| false, |
| false |
| ); |
| ByteBufferInputRowParser parser2 = jsonMapper.readValue( |
| jsonMapper.writeValueAsString(parser), |
| ByteBufferInputRowParser.class |
| ); |
| repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputRowParser) parser2).getAvroBytesDecoder()).getSchemaRepository(); |
| |
| // prepare data |
| GenericRecord someAvroDatum = buildSomeAvroDatum(); |
| |
| // encode schema id |
| Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); |
| TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>( |
| repository, |
| new IntegerConverter(), |
| new AvroSchemaConverter(), |
| new IdentityConverter() |
| ); |
| Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); |
| ByteBuffer byteBuffer = ByteBuffer.allocate(4); |
| converter.putSubjectAndId(id, byteBuffer); |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| out.write(byteBuffer.array()); |
| // encode data |
| DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); |
| // write avro datum to bytes |
| writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); |
| |
| InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); |
| |
| assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); |
| } |
| } |
| |
| static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro) |
| { |
| Assert.assertEquals(expectedDimensions, inputRow.getDimensions()); |
| Assert.assertEquals(1543698L, inputRow.getTimestampFromEpoch()); |
| |
| // test dimensions |
| Assert.assertEquals(Collections.singletonList(EVENT_TYPE_VALUE), inputRow.getDimension(EVENT_TYPE)); |
| Assert.assertEquals(Collections.singletonList(String.valueOf(ID_VALUE)), inputRow.getDimension(ID)); |
| Assert.assertEquals( |
| Collections.singletonList(String.valueOf(SOME_OTHER_ID_VALUE)), |
| inputRow.getDimension(SOME_OTHER_ID) |
| ); |
| Assert.assertEquals(Collections.singletonList(String.valueOf(true)), inputRow.getDimension(IS_VALID)); |
| |
| // someRecordArray represents a record generated from Pig using AvroStorage |
| // as it implicitly converts array elements to a record |
| if (isFromPigAvro) { |
| Assert.assertEquals( |
| Collections.singletonList(SOME_RECORD_ARRAY_VALUE.get(0).getNestedString()), |
| inputRow.getDimension("someRecordArray") |
| ); |
| } else { |
| Assert.assertEquals( |
| Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf), |
| inputRow.getDimension("someIntArray") |
| ); |
| Assert.assertEquals( |
| Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf), |
| inputRow.getDimension("someStringArray") |
| ); |
| |
| } |
| // towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality |
| Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size()); |
| Assert.assertEquals( |
| SOME_INT_VALUE_MAP_VALUE, |
| new HashMap<CharSequence, Integer>( |
| Maps.transformValues( |
| Splitter |
| .on(",") |
| .withKeyValueSeparator("=") |
| .split(BRACES_AND_SPACE.matcher(inputRow.getDimension("someIntValueMap").get(0)).replaceAll("")), |
| new Function<String, Integer>() |
| { |
| @Nullable |
| @Override |
| public Integer apply(@Nullable String input) |
| { |
| return Integer.valueOf(input); |
| } |
| } |
| ) |
| ) |
| ); |
| Assert.assertEquals( |
| SOME_STRING_VALUE_MAP_VALUE, |
| new HashMap<CharSequence, CharSequence>( |
| Splitter |
| .on(",") |
| .withKeyValueSeparator("=") |
| .split(BRACES_AND_SPACE.matcher(inputRow.getDimension("someIntValueMap").get(0)).replaceAll("")) |
| ) |
| ); |
| Assert.assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion")); |
| Assert.assertEquals(Collections.emptyList(), inputRow.getDimension("someNull")); |
| Assert.assertEquals( |
| Arrays.toString(SOME_FIXED_VALUE.bytes()), |
| Arrays.toString((byte[]) (inputRow.getRaw("someFixed"))) |
| ); |
| Assert.assertEquals( |
| Arrays.toString(SOME_BYTES_VALUE.array()), |
| Arrays.toString((byte[]) (inputRow.getRaw("someBytes"))) |
| ); |
| Assert.assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum")); |
| Assert.assertEquals( |
| Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)), |
| inputRow.getDimension("someRecord") |
| ); |
| |
| // test metrics |
| Assert.assertEquals(SOME_FLOAT_VALUE, inputRow.getMetric("someFloat").floatValue(), 0); |
| Assert.assertEquals(SOME_LONG_VALUE, inputRow.getMetric("someLong")); |
| Assert.assertEquals(SOME_INT_VALUE, inputRow.getMetric("someInt")); |
| } |
| |
| private static SomeAvroDatum.Builder createSomeAvroDatumBuilderDefaults() |
| { |
| return SomeAvroDatum.newBuilder() |
| .setTimestamp(DATE_TIME.toInstant().toEpochMilli()) |
| .setEventType(EVENT_TYPE_VALUE) |
| .setId(ID_VALUE) |
| .setSomeOtherId(SOME_OTHER_ID_VALUE) |
| .setIsValid(true) |
| .setSomeFloat(SOME_FLOAT_VALUE) |
| .setSomeInt(SOME_INT_VALUE) |
| .setSomeLong(SOME_LONG_VALUE) |
| .setSomeIntArray(SOME_INT_ARRAY_VALUE) |
| .setSomeStringArray(SOME_STRING_ARRAY_VALUE) |
| .setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE) |
| .setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE) |
| .setSomeUnion(SOME_UNION_VALUE) |
| .setSomeMultiMemberUnion(SOME_UNION_MEMBER_VALUE) |
| .setSomeFixed(SOME_FIXED_VALUE) |
| .setSomeBytes(SOME_BYTES_VALUE) |
| .setSomeNull(null) |
| .setSomeEnum(MyEnum.ENUM1) |
| .setSomeRecord(SOME_RECORD_VALUE) |
| .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE); |
| } |
| |
| public static SomeAvroDatum buildSomeAvroDatum() |
| { |
| return createSomeAvroDatumBuilderDefaults().build(); |
| } |
| |
| public static SomeAvroDatum buildSomeAvroDatumWithUnionValue(Object unionValue) |
| { |
| return createSomeAvroDatumBuilderDefaults() |
| .setSomeMultiMemberUnion(unionValue) |
| .build(); |
| } |
| } |