blob: 8888a69a0d89d8e4b1376377ac46320ea068c94f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class KafkaIndexTaskTuningConfigTest
{
private final ObjectMapper mapper;
public KafkaIndexTaskTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
}
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\"type\": \"kafka\"}";
KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertNull(config.getMaxTotalRows());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"maxTotalRows\": 1000,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, config.getMaxTotalRows());
Assert.assertEquals(1000, config.getMaxTotalRows().longValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists());
}
@Test
public void testConvert()
{
KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
null,
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, copy.getMaxTotalRows());
Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
}
@Test
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
null,
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
null,
null,
null,
true,
42,
42
);
String serialized = mapper.writeValueAsString(base);
TestModifiedKafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
@Test
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
null,
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
null,
null,
null,
true,
42,
42,
"extra string"
);
String serialized = mapper.writeValueAsString(base);
KafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class)
.usingGetClass()
.verify();
}
}