blob: 1911aa6bc5b2d33c105b212252514a32a4c12b31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* This is mostly a test of the validator
*/
@RunWith(Parameterized.class)
public class IndexIOTest extends InitializedNullHandlingTest
{
private static Interval DEFAULT_INTERVAL = Intervals.of("1970-01-01/2000-01-01");
private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
CompressionStrategy.LZ4,
CompressionStrategy.LZ4,
CompressionFactory.LongEncodingStrategy.LONGS
);
static {
NullHandling.initializeForTests();
}
private static <T> List<T> filterByBitset(List<T> list, BitSet bitSet)
{
final ArrayList<T> outList = new ArrayList<>(bitSet.cardinality());
for (int i = 0; i < list.size(); ++i) {
if (bitSet.get(i)) {
outList.add(list.get(i));
}
}
return outList;
}
@Parameterized.Parameters(name = "{0}, {1}")
public static Iterable<Object[]> constructionFeeder()
{
final Map<String, Object> map = ImmutableMap.of();
final Map<String, Object> map00 = ImmutableMap.of(
"dim0", ImmutableList.of("dim00", "dim01")
);
final Map<String, Object> map10 = ImmutableMap.of(
"dim1", "dim10"
);
final Map<String, Object> map0null = new HashMap<>();
map0null.put("dim0", null);
final Map<String, Object> map1null = new HashMap<>();
map1null.put("dim1", null);
final Map<String, Object> mapAll = ImmutableMap.of(
"dim0", ImmutableList.of("dim00", "dim01"),
"dim1", "dim10"
);
final List<Map<String, Object>> maps = ImmutableList.of(map, map00, map10, map0null, map1null, mapAll);
return Iterables.concat(
// First iterable tests permutations of the maps which are expected to be equal
Iterables.concat(
new Iterable<Iterable<Object[]>>()
{
@Override
public Iterator<Iterable<Object[]>> iterator()
{
return new Iterator<Iterable<Object[]>>()
{
long nextBitset = 1L;
@Override
public boolean hasNext()
{
return nextBitset < (1L << maps.size());
}
@Override
public Iterable<Object[]> next()
{
final BitSet bitset = BitSet.valueOf(new long[]{nextBitset++});
final List<Map<String, Object>> myMaps = filterByBitset(maps, bitset);
return Collections2.transform(
Collections2.permutations(myMaps), new Function<List<Map<String, Object>>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<Map<String, Object>> input)
{
return new Object[]{input, input, null};
}
}
);
}
@Override
public void remove()
{
throw new UOE("Remove not suported");
}
};
}
}
),
// Second iterable tests combinations of the maps which may or may not be equal
Iterables.concat(
new Iterable<Iterable<Object[]>>()
{
@Override
public Iterator<Iterable<Object[]>> iterator()
{
return new Iterator<Iterable<Object[]>>()
{
long nextMap1Bits = 1L;
@Override
public boolean hasNext()
{
return nextMap1Bits < (1L << maps.size());
}
@Override
public Iterable<Object[]> next()
{
final BitSet bitset1 = BitSet.valueOf(new long[]{nextMap1Bits++});
final List<Map<String, Object>> maplist1 = filterByBitset(maps, bitset1);
return new Iterable<Object[]>()
{
@Override
public Iterator<Object[]> iterator()
{
return new Iterator<Object[]>()
{
long nextMap2Bits = 1L;
@Override
public boolean hasNext()
{
return nextMap2Bits < (1L << maps.size());
}
@Override
public Object[] next()
{
final List<Map<String, Object>> maplist2 = filterByBitset(
maps,
BitSet.valueOf(
new long[]{nextMap2Bits++}
)
);
return new Object[]{
maplist1,
maplist2,
filterNullValues(maplist1).equals(filterNullValues(maplist2)) ?
null : SegmentValidationException.class
};
}
@Override
public void remove()
{
throw new UOE("remove not supported");
}
};
}
};
}
@Override
public void remove()
{
throw new UOE("Remove not supported");
}
};
}
}
)
);
}
public static List<Map> filterNullValues(List<Map<String, Object>> mapList)
{
return Lists.transform(mapList, (Function<Map, Map>) input -> Maps.filterValues(input, Objects::nonNull));
}
private final Collection<Map<String, Object>> events1;
private final Collection<Map<String, Object>> events2;
private final Class<? extends Exception> exception;
public IndexIOTest(
Collection<Map<String, Object>> events1,
Collection<Map<String, Object>> events2,
Class<? extends Exception> exception
)
{
this.events1 = events1;
this.events2 = events2;
this.exception = exception;
}
final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withMetrics(new CountAggregatorFactory("count"))
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build()
)
.setMaxRowCount(1000000)
.build();
final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withMetrics(new CountAggregatorFactory("count"))
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build()
)
.setMaxRowCount(1000000)
.build();
IndexableAdapter adapter1;
IndexableAdapter adapter2;
@Before
public void setUp() throws IndexSizeExceededException
{
long timestamp = 0L;
for (Map<String, Object> event : events1) {
incrementalIndex1.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
}
timestamp = 0L;
for (Map<String, Object> event : events2) {
incrementalIndex2.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
}
adapter2 = new IncrementalIndexAdapter(
DEFAULT_INTERVAL,
incrementalIndex2,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
adapter1 = new IncrementalIndexAdapter(
DEFAULT_INTERVAL,
incrementalIndex1,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
}
@Test
public void testRowValidatorEquals() throws Exception
{
Exception ex = null;
try {
TestHelper.getTestIndexIO().validateTwoSegments(adapter1, adapter2);
}
catch (Exception e) {
ex = e;
}
if (exception != null) {
Assert.assertNotNull("Exception was not thrown", ex);
if (!exception.isAssignableFrom(ex.getClass())) {
throw ex;
}
} else {
if (ex != null) {
throw ex;
}
}
}
@Test
public void testLoadSegmentDamagedFileWithLazy()
{
final ObjectMapper mapper = new DefaultObjectMapper();
final IndexIO indexIO = new IndexIO(mapper, () -> 0);
String path = this.getClass().getClassLoader().getResource("v9SegmentPersistDir/segmentWithDamagedFile/").getPath();
ForkSegmentLoadDropHandler segmentLoadDropHandler = new ForkSegmentLoadDropHandler();
ForkSegment segment = new ForkSegment(true);
Assert.assertTrue(segment.getSegmentExist());
File inDir = new File(path);
Exception e = null;
try {
QueryableIndex queryableIndex = indexIO.loadIndex(inDir, true, () -> segmentLoadDropHandler.removeSegment(segment));
Assert.assertNotNull(queryableIndex);
queryableIndex.getDimensionHandlers();
List<String> columnNames = queryableIndex.getColumnNames();
for (String columnName : columnNames) {
Assert.assertNotNull(queryableIndex.getColumnHolder(columnName).toString());
}
}
catch (Exception ex) {
// Do nothing. Can ignore exceptions here.
e = ex;
}
Assert.assertNotNull(e);
Assert.assertFalse(segment.getSegmentExist());
}
private static class ForkSegmentLoadDropHandler
{
public void addSegment()
{
}
public void removeSegment(ForkSegment segment)
{
segment.setSegmentExist(false);
}
}
private static class ForkSegment
{
private Boolean segmentExist;
ForkSegment(Boolean segmentExist)
{
this.segmentExist = segmentExist;
}
void setSegmentExist(Boolean value)
{
this.segmentExist = value;
}
Boolean getSegmentExist()
{
return this.segmentExist;
}
}
}