blob: 53ab4db3cc5e0975647a987f8507a55e01e61ee3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.timeline.partition.NumberedOverwritingPartitionChunk;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.joda.time.Interval;
import org.junit.Assert;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* The purpose of this class is to host the common static methods used in {@link VersionedIntervalTimelineTest} and
* {@link VersionedIntervalTimelineSpecificDataTest}. When Druid style allows static imports in tests, this
* functionality should be moved to {@link VersionedIntervalTimelineTest} and then used statically from {@link
* VersionedIntervalTimelineSpecificDataTest}.
*/
public class VersionedIntervalTimelineTestBase
{
static void assertValues(
List<Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>>> expected,
List<TimelineObjectHolder<String, OvershadowableInteger>> actual
)
{
Assert.assertEquals("Sizes did not match.", expected.size(), actual.size());
Iterator<Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>>> expectedIter = expected.iterator();
Iterator<TimelineObjectHolder<String, OvershadowableInteger>> actualIter = actual.iterator();
while (expectedIter.hasNext()) {
Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>> pair = expectedIter.next();
TimelineObjectHolder<String, OvershadowableInteger> holder = actualIter.next();
Assert.assertEquals(pair.lhs, holder.getInterval());
Assert.assertEquals(pair.rhs.lhs, holder.getVersion());
final List<PartitionChunk<OvershadowableInteger>> expectedChunks = Lists.newArrayList(pair.rhs.rhs);
final List<PartitionChunk<OvershadowableInteger>> actualChunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(expectedChunks.size(), actualChunks.size());
for (int i = 0; i < expectedChunks.size(); i++) {
// Check partitionNumber first
Assert.assertEquals(expectedChunks.get(i), actualChunks.get(i));
final OvershadowableInteger expectedInteger = expectedChunks.get(i).getObject();
final OvershadowableInteger actualInteger = actualChunks.get(i).getObject();
Assert.assertEquals(expectedInteger, actualInteger);
}
}
}
static void assertValues(
Set<Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>>> expected,
Set<TimelineObjectHolder<String, OvershadowableInteger>> actual
)
{
Assert.assertEquals("Sizes did not match.", expected.size(), actual.size());
Set<Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>>> actualSet =
Sets.newHashSet(
Iterables.transform(
actual,
input -> new Pair<>(input.getInterval(), new Pair<>(input.getVersion(), input.getObject()))
)
);
Assert.assertEquals(expected, actualSet);
}
static void assertSingleElementChunks(
PartitionChunk<OvershadowableInteger> expected,
PartitionChunk<OvershadowableInteger> actual
)
{
SingleElementPartitionChunk<OvershadowableInteger> expectedSingle = (SingleElementPartitionChunk<OvershadowableInteger>) expected;
SingleElementPartitionChunk<OvershadowableInteger> actualSingle = (SingleElementPartitionChunk<OvershadowableInteger>) actual;
Assert.assertEquals(expectedSingle.getObject(), actualSingle.getObject());
}
static VersionedIntervalTimeline<String, OvershadowableInteger> makeStringIntegerTimeline()
{
return new VersionedIntervalTimeline<>(Ordering.natural());
}
VersionedIntervalTimeline<String, OvershadowableInteger> timeline;
void checkRemove()
{
for (TimelineObjectHolder<String, OvershadowableInteger> holder : timeline.findFullyOvershadowed()) {
// Copy chunks to avoid the ConcurrentModificationException.
// Note that timeline.remove() modifies the PartitionHolder.
List<PartitionChunk<OvershadowableInteger>> chunks = FluentIterable.from(holder.getObject()).toList();
for (PartitionChunk<OvershadowableInteger> chunk : chunks) {
timeline.remove(holder.getInterval(), holder.getVersion(), chunk);
}
}
Assert.assertTrue(timeline.findFullyOvershadowed().isEmpty());
}
TimelineObjectHolder<String, OvershadowableInteger> makeTimelineObjectHolder(
String interval,
String version,
List<PartitionChunk<OvershadowableInteger>> chunks
)
{
return new TimelineObjectHolder<>(
Intervals.of(interval),
Intervals.of(interval),
version,
new PartitionHolder<>(chunks)
);
}
Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>> createExpected(
String intervalString,
String version,
Integer value
)
{
return createExpected(
intervalString,
version,
Collections.singletonList(makeSingle(version, value))
);
}
Pair<Interval, Pair<String, PartitionHolder<OvershadowableInteger>>> createExpected(
String intervalString,
String version,
List<PartitionChunk<OvershadowableInteger>> values
)
{
return Pair.of(
Intervals.of(intervalString),
Pair.of(version, new PartitionHolder<>(values))
);
}
public static PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int value)
{
return makeSingle(majorVersion, 0, value);
}
public static PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int partitionNum, int val)
{
return new SingleElementPartitionChunk<>(new OvershadowableInteger(majorVersion, partitionNum, val));
}
public static PartitionChunk<OvershadowableInteger> makeNumbered(String majorVersion, int partitionNum, int val)
{
return makeNumbered(majorVersion, partitionNum, 0, val);
}
public static PartitionChunk<OvershadowableInteger> makeNumbered(
String majorVersion,
int partitionNum,
int chunks,
int val
)
{
return new NumberedPartitionChunk<>(
partitionNum,
chunks,
new OvershadowableInteger(majorVersion, partitionNum, val)
);
}
public static PartitionChunk<OvershadowableInteger> makeNumberedOverwriting(
String majorVersion,
int partitionNumOrdinal,
int val,
int startRootPartitionId,
int endRootPartitionId,
int minorVersion,
int atomicUpdateGroupSize
)
{
final int partitionNum = PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + partitionNumOrdinal;
return new NumberedOverwritingPartitionChunk<>(
partitionNum,
new OvershadowableInteger(
majorVersion,
partitionNum,
val,
startRootPartitionId,
endRootPartitionId,
minorVersion,
atomicUpdateGroupSize
)
);
}
void add(String interval, String version, Integer value)
{
add(Intervals.of(interval), version, value);
}
void add(Interval interval, String version, Integer value)
{
add(interval, version, makeSingle(version, value));
}
void add(String interval, String version, PartitionChunk<OvershadowableInteger> value)
{
add(Intervals.of(interval), version, value);
}
protected void add(Interval interval, String version, PartitionChunk<OvershadowableInteger> value)
{
timeline.add(interval, version, value);
}
}