blob: 009b5a60b076e2e641e2f5a6212a0db3396a1a2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.io.IOUtils;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
public class SchemaRegistryBasedProtobufBytesDecoderTest
{
private SchemaRegistryClient registry;
@Before
public void setUp()
{
registry = Mockito.mock(CachedSchemaRegistryClient.class);
}
@Test
public void testParse() throws Exception
{
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(parseProtobufSchema());
ProtoTestEventWrapper.ProtoTestEvent event = getTestEvent();
byte[] bytes = event.toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes);
bb.rewind();
// When
DynamicMessage actual = new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb);
// Then
Assert.assertEquals(actual.getField(actual.getDescriptorForType().findFieldByName("id")), event.getId());
}
@Test(expected = ParseException.class)
public void testParseCorrupted() throws Exception
{
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(parseProtobufSchema());
byte[] bytes = getTestEvent().toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((bytes), 5, 10);
bb.rewind();
// When
new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb);
}
@Test(expected = ParseException.class)
public void testParseWrongId() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
byte[] bytes = getTestEvent().toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes);
bb.rewind();
// When
new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb);
}
@Test
public void testDefaultCapacity()
{
// Given
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE);
}
@Test
public void testGivenCapacity()
{
int capacity = 100;
// Given
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity);
}
private ProtoTestEventWrapper.ProtoTestEvent getTestEvent()
{
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatData(dateTime);
return event;
}
@Test
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testParseHeader() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, header.size());
Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
}
@Test
public void testParseConfig() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
Map<String, ?> heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, heaeder.size());
Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1"));
Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2"));
Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3"));
}
private ProtobufSchema parseProtobufSchema() throws IOException
{
// Given
InputStream fin;
fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto");
String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
fin = this.getClass().getClassLoader().getResourceAsStream("google/protobuf/timestamp.proto");
String timestampProtobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
return new ProtobufSchema(protobufString, Collections.emptyList(),
ImmutableMap.of("google/protobuf/timestamp.proto", timestampProtobufString), null, null);
}
}