| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordinator.duty; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; |
| import org.apache.druid.indexer.partitions.PartitionsSpec; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.PeriodGranularity; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.segment.IndexSpec; |
| import org.apache.druid.segment.data.ConciseBitmapSerdeFactory; |
| import org.apache.druid.server.coordinator.DataSourceCompactionConfig; |
| import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; |
| import org.apache.druid.timeline.CompactionState; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.Partitions; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.partition.NumberedShardSpec; |
| import org.apache.druid.timeline.partition.ShardSpec; |
| import org.assertj.core.api.Assertions; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Interval; |
| import org.joda.time.Period; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.stream.Collectors; |
| |
| public class NewestSegmentFirstPolicyTest |
| { |
| private static final String DATA_SOURCE = "dataSource"; |
| private static final long DEFAULT_SEGMENT_SIZE = 1000; |
| private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; |
| |
| private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper()); |
| |
| private ObjectMapper mapper = new DefaultObjectMapper(); |
| |
| @Test |
| public void testLargeOffsetAndSmallSegmentInterval() |
| { |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), |
| new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), |
| Intervals.of("2017-11-15T03:00:00/2017-11-15T04:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testSmallOffsetAndLargeSegmentInterval() |
| { |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), |
| new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"), |
| Intervals.of("2017-11-17T02:00:00/2017-11-17T03:00:00"), |
| false |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), |
| Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testLargeGapInData() |
| { |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), |
| // larger gap than SegmentCompactionUtil.LOOKUP_PERIOD (1 day) |
| new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"), |
| Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"), |
| false |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), |
| Intervals.of("2017-11-15T06:00:00/2017-11-15T07:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testHugeShard() |
| { |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-11-17T00:00:00/2017-11-18T03:00:00"), |
| new Period("PT1H"), |
| 200, |
| DEFAULT_NUM_SEGMENTS_PER_SHARD |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-11-09T00:00:00/2017-11-17T00:00:00"), |
| new Period("P2D"), |
| 13000, // larger than target compact segment size |
| 1 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-11-05T00:00:00/2017-11-09T00:00:00"), |
| new Period("PT1H"), |
| 200, |
| DEFAULT_NUM_SEGMENTS_PER_SHARD |
| ) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| Interval lastInterval = null; |
| while (iterator.hasNext()) { |
| final List<DataSegment> segments = iterator.next(); |
| lastInterval = segments.get(0).getInterval(); |
| |
| Interval prevInterval = null; |
| for (DataSegment segment : segments) { |
| if (prevInterval != null && !prevInterval.getStart().equals(segment.getInterval().getStart())) { |
| Assert.assertEquals(prevInterval.getEnd(), segment.getInterval().getStart()); |
| } |
| |
| prevInterval = segment.getInterval(); |
| } |
| } |
| |
| Assert.assertNotNull(lastInterval); |
| Assert.assertEquals(Intervals.of("2017-11-05T00:00:00/2017-11-05T01:00:00"), lastInterval); |
| } |
| |
| @Test |
| public void testManySegmentsPerShard() |
| { |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-04T01:00:00/2017-12-05T03:00:00"), |
| new Period("PT1H"), |
| 375, |
| 80 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-04T00:00:00/2017-12-04T01:00:00"), |
| new Period("PT1H"), |
| 200, |
| 150 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-03T18:00:00/2017-12-04T00:00:00"), |
| new Period("PT6H"), |
| 200000, |
| 1 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-03T11:00:00/2017-12-03T18:00:00"), |
| new Period("PT1H"), |
| 375, |
| 80 |
| ) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| Interval lastInterval = null; |
| while (iterator.hasNext()) { |
| final List<DataSegment> segments = iterator.next(); |
| lastInterval = segments.get(0).getInterval(); |
| |
| Interval prevInterval = null; |
| for (DataSegment segment : segments) { |
| if (prevInterval != null && !prevInterval.getStart().equals(segment.getInterval().getStart())) { |
| Assert.assertEquals(prevInterval.getEnd(), segment.getInterval().getStart()); |
| } |
| |
| prevInterval = segment.getInterval(); |
| } |
| } |
| |
| Assert.assertNotNull(lastInterval); |
| Assert.assertEquals(Intervals.of("2017-12-03T11:00:00/2017-12-03T12:00:00"), lastInterval); |
| } |
| |
| @Test |
| public void testSkipUnknownDataSource() |
| { |
| final String unknownDataSource = "unknown"; |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of( |
| unknownDataSource, |
| createCompactionConfig(10000, new Period("P2D"), null), |
| DATA_SOURCE, |
| createCompactionConfig(10000, new Period("P2D"), null) |
| ), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), |
| new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) |
| ) |
| ), |
| Collections.emptyMap() |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), |
| Intervals.of("2017-11-15T03:00:00/2017-11-15T04:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testClearSegmentsToCompactWhenSkippingSegments() |
| { |
| final long inputSegmentSizeBytes = 800000; |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"), |
| new Period("P1D"), |
| inputSegmentSizeBytes / 2 + 10, |
| 1 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"), |
| new Period("P1D"), |
| inputSegmentSizeBytes + 10, // large segment |
| 1 |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), |
| new Period("P1D"), |
| inputSegmentSizeBytes / 3 + 10, |
| 2 |
| ) |
| ); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-03/2017-12-04"), Partitions.ONLY_COMPLETE) |
| ); |
| expectedSegmentsToCompact.sort(Comparator.naturalOrder()); |
| |
| final List<DataSegment> expectedSegmentsToCompact2 = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01/2017-12-02"), Partitions.ONLY_COMPLETE) |
| ); |
| expectedSegmentsToCompact2.sort(Comparator.naturalOrder()); |
| |
| Assertions.assertThat(iterator) |
| .toIterable() |
| .containsExactly(expectedSegmentsToCompact, expectedSegmentsToCompact2); |
| } |
| |
| @Test |
| public void testIfFirstSegmentIsInSkipOffset() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"), |
| new Period("PT5H"), |
| 40000, |
| 1 |
| ) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIfFirstSegmentOverlapsSkipOffset() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), |
| new Period("PT5H"), |
| 40000, |
| 1 |
| ) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("P1D")), |
| new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should only get segments in Oct |
| final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| |
| Assert.assertTrue(iterator.hasNext()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator)))); |
| } |
| |
| @Test |
| public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| // This contains segment that |
| // - Cross between month boundary of latest month (starts in Nov and ends in Dec). This should be skipped |
| // - Fully in latest month (starts in Dec and ends in Dec). This should be skipped |
| // - Does not overlap latest month (starts in Oct and ends in Oct). This should not be skipped |
| new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")), |
| new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should only get segments in Oct |
| final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> actual = iterator.next(); |
| Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")), |
| new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should only get segments in Oct |
| final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| |
| Assert.assertTrue(iterator.hasNext()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator)))); |
| } |
| |
| @Test |
| public void testWithSkipIntervals() |
| { |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), |
| new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) |
| ) |
| ), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| ImmutableList.of( |
| Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"), |
| Intervals.of("2017-11-15T00:00:00/2017-11-15T20:00:00"), |
| Intervals.of("2017-11-13T00:00:00/2017-11-14T01:00:00") |
| ) |
| ) |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-15T20:00:00/2017-11-15T21:00:00"), |
| Intervals.of("2017-11-15T23:00:00/2017-11-16T00:00:00"), |
| false |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-14T01:00:00/2017-11-14T02:00:00"), |
| Intervals.of("2017-11-14T23:00:00/2017-11-15T00:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testHoleInSearchInterval() |
| { |
| final Period segmentPeriod = new Period("PT1H"); |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"), segmentPeriod) |
| ) |
| ), |
| ImmutableMap.of( |
| DATA_SOURCE, |
| ImmutableList.of( |
| Intervals.of("2017-11-16T04:00:00/2017-11-16T10:00:00"), |
| Intervals.of("2017-11-16T14:00:00/2017-11-16T20:00:00") |
| ) |
| ) |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"), |
| Intervals.of("2017-11-16T22:00:00/2017-11-16T23:00:00"), |
| false |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-16T10:00:00/2017-11-16T11:00:00"), |
| Intervals.of("2017-11-16T13:00:00/2017-11-16T14:00:00"), |
| false |
| ); |
| |
| assertCompactSegmentIntervals( |
| iterator, |
| segmentPeriod, |
| Intervals.of("2017-11-16T00:00:00/2017-11-16T01:00:00"), |
| Intervals.of("2017-11-16T03:00:00/2017-11-16T04:00:00"), |
| true |
| ); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| // Segments with day interval from Oct to Dec |
| new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should get all segments in timeline back since skip offset is P0D. |
| // However, we only need to iterator 3 times (once for each month) since the new configured segmentGranularity is MONTH. |
| // and hence iterator would return all segments bucketed to the configured segmentGranularity |
| // Month of Dec |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-31T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // Month of Nov |
| Assert.assertTrue(iterator.hasNext()); |
| expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-11-01T00:00:00/2017-12-01T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // Month of Oct |
| Assert.assertTrue(iterator.hasNext()); |
| expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-11-01T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new Period("P7D")), |
| new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new Period("P7D")), |
| new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get the segment of "2020-01-28/2020-02-03" back twice when the iterator returns for Jan and when the |
| // iterator returns for Feb. |
| |
| // Month of Feb |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> actual = iterator.next(); |
| Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); |
| // Month of Jan |
| expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertTrue(iterator.hasNext()); |
| actual = iterator.next(); |
| Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorDoesNotReturnCompactedInterval() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D")) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertTrue(iterator.hasNext()); |
| Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(iterator.next())); |
| // Iterator should return only once since all the "minute" interval of the iterator contains the same interval |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsAllMixedVersionSegmentsInConfiguredSegmentGranularity() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), "1994-04-29T00:00:00.000Z", null), |
| new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should get all segments in timeline back since skip offset is P0D. |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsNothingAsSegmentsWasCompactedAndHaveSameSegmentGranularityAndSameTimezone() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ) |
| ); |
| |
| // Auto compaction config sets segmentGranularity=DAY |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsNothingAsSegmentsWasCompactedAndHaveSameSegmentGranularityInLastCompactionState() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) |
| ) |
| ); |
| |
| // Auto compaction config sets segmentGranularity=DAY |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentSegmentGranularity() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ) |
| ); |
| |
| // Auto compaction config sets segmentGranularity=YEAR |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get all segments in timeline back since skip offset is P0D. |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentSegmentGranularityInLastCompactionState() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) |
| ), |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) |
| ) |
| ); |
| |
| // Auto compaction config sets segmentGranularity=YEAR |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get all segments in timeline back since skip offset is P0D. |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentTimezone() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ) |
| ); |
| |
| // Duration of new segmentGranularity is the same as before (P1D), |
| // but we changed the timezone from UTC to Bangkok in the auto compaction spec |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, |
| createCompactionConfig( |
| 130000, |
| new Period("P0D"), |
| new UserCompactionTaskGranularityConfig( |
| new PeriodGranularity( |
| new Period("P1D"), |
| null, |
| DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Bangkok")) |
| ), |
| null |
| ) |
| ) |
| ), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get all segments in timeline back since skip offset is P0D. |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentOrigin() |
| { |
| // Same indexSpec as what is set in the auto compaction config |
| Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}); |
| // Same partitionsSpec as what is set in the auto compaction config |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, indexSpec, null) |
| ) |
| ); |
| |
| // Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, |
| createCompactionConfig( |
| 130000, |
| new Period("P0D"), |
| new UserCompactionTaskGranularityConfig( |
| new PeriodGranularity( |
| new Period("P1D"), |
| DateTimes.of("2012-01-02T00:05:00.000Z"), |
| DateTimeZone.UTC |
| ), |
| null |
| ) |
| ) |
| ), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get all segments in timeline back since skip offset is P0D. |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() |
| { |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), "1994-04-29T00:00:00.000Z", null), |
| new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) |
| ); |
| |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null))), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| |
| // We should get all segments in timeline back since skip offset is P0D. |
| // Although the first iteration only covers the last hour of 2017-10-01 (2017-10-01T23:00:00/2017-10-02T00:00:00), |
| // the iterator will returns all segment as the umbrella interval the DAY segment (2017-10-01T00:00:00/2017-10-02T00:00:00) |
| // also convers the HOUR segment (2017-10-01T01:00:00/2017-10-01T02:00:00) |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| @Test |
| public void testIteratorReturnsSegmentsAsCompactionStateChangedWithCompactedStateHasSameSegmentGranularity() |
| { |
| // Different indexSpec as what is set in the auto compaction config |
| IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); |
| Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {}); |
| PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); |
| |
| // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY |
| final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( |
| new SegmentGenerateSpec( |
| Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), |
| new Period("P1D"), |
| null, |
| new CompactionState(partitionsSpec, newIndexSpecMap, null) |
| ) |
| ); |
| |
| // Duration of new segmentGranularity is the same as before (P1D) |
| final CompactionSegmentIterator iterator = policy.reset( |
| ImmutableMap.of(DATA_SOURCE, |
| createCompactionConfig( |
| 130000, |
| new Period("P0D"), |
| new UserCompactionTaskGranularityConfig( |
| new PeriodGranularity( |
| new Period("P1D"), |
| null, |
| DateTimeZone.UTC |
| ), |
| null |
| ) |
| ) |
| ), |
| ImmutableMap.of(DATA_SOURCE, timeline), |
| Collections.emptyMap() |
| ); |
| // We should get all segments in timeline back since indexSpec changed |
| Assert.assertTrue(iterator.hasNext()); |
| List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( |
| timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE) |
| ); |
| Assert.assertEquals( |
| ImmutableSet.copyOf(expectedSegmentsToCompact), |
| ImmutableSet.copyOf(iterator.next()) |
| ); |
| // No more |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| |
| private static void assertCompactSegmentIntervals( |
| CompactionSegmentIterator iterator, |
| Period segmentPeriod, |
| Interval from, |
| Interval to, |
| boolean assertLast |
| ) |
| { |
| Interval expectedSegmentIntervalStart = to; |
| while (iterator.hasNext()) { |
| final List<DataSegment> segments = iterator.next(); |
| |
| final Interval firstInterval = segments.get(0).getInterval(); |
| Assert.assertTrue( |
| "Intervals should be same or abutting", |
| segments.stream().allMatch( |
| segment -> segment.getInterval().isEqual(firstInterval) || segment.getInterval().abuts(firstInterval) |
| ) |
| ); |
| |
| final List<Interval> expectedIntervals = new ArrayList<>(segments.size()); |
| for (int i = 0; i < segments.size(); i++) { |
| if (i > 0 && i % DEFAULT_NUM_SEGMENTS_PER_SHARD == 0) { |
| expectedSegmentIntervalStart = new Interval(segmentPeriod, expectedSegmentIntervalStart.getStart()); |
| } |
| expectedIntervals.add(expectedSegmentIntervalStart); |
| } |
| expectedIntervals.sort(Comparators.intervalsByStartThenEnd()); |
| |
| Assert.assertEquals( |
| expectedIntervals, |
| segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) |
| ); |
| |
| if (expectedSegmentIntervalStart.equals(from)) { |
| break; |
| } |
| expectedSegmentIntervalStart = new Interval(segmentPeriod, expectedSegmentIntervalStart.getStart()); |
| } |
| |
| if (assertLast) { |
| Assert.assertFalse(iterator.hasNext()); |
| } |
| } |
| |
| private static VersionedIntervalTimeline<String, DataSegment> createTimeline( |
| SegmentGenerateSpec... specs |
| ) |
| { |
| List<DataSegment> segments = new ArrayList<>(); |
| final String version = DateTimes.nowUtc().toString(); |
| |
| final List<SegmentGenerateSpec> orderedSpecs = Arrays.asList(specs); |
| orderedSpecs.sort(Comparator.comparing(s -> s.totalInterval, Comparators.intervalsByStartThenEnd().reversed())); |
| |
| for (SegmentGenerateSpec spec : orderedSpecs) { |
| Interval remainingInterval = spec.totalInterval; |
| |
| while (!Intervals.isEmpty(remainingInterval)) { |
| final Interval segmentInterval; |
| if (remainingInterval.toDuration().isLongerThan(spec.segmentPeriod.toStandardDuration())) { |
| segmentInterval = new Interval(spec.segmentPeriod, remainingInterval.getEnd()); |
| } else { |
| segmentInterval = remainingInterval; |
| } |
| |
| for (int i = 0; i < spec.numSegmentsPerShard; i++) { |
| final ShardSpec shardSpec = new NumberedShardSpec(i, spec.numSegmentsPerShard); |
| final DataSegment segment = new DataSegment( |
| DATA_SOURCE, |
| segmentInterval, |
| spec.version == null ? version : spec.version, |
| null, |
| ImmutableList.of(), |
| ImmutableList.of(), |
| shardSpec, |
| spec.lastCompactionState, |
| 0, |
| spec.segmentSize |
| ); |
| segments.add(segment); |
| } |
| |
| remainingInterval = SegmentCompactionUtil.removeIntervalFromEnd(remainingInterval, segmentInterval); |
| } |
| } |
| |
| return VersionedIntervalTimeline.forSegments(segments); |
| } |
| |
| private DataSourceCompactionConfig createCompactionConfig( |
| long inputSegmentSizeBytes, |
| Period skipOffsetFromLatest, |
| UserCompactionTaskGranularityConfig granularitySpec |
| ) |
| { |
| return new DataSourceCompactionConfig( |
| DATA_SOURCE, |
| 0, |
| inputSegmentSizeBytes, |
| null, |
| skipOffsetFromLatest, |
| null, |
| granularitySpec, |
| null, |
| null |
| ); |
| } |
| |
| private static class SegmentGenerateSpec |
| { |
| private final Interval totalInterval; |
| private final Period segmentPeriod; |
| private final long segmentSize; |
| private final int numSegmentsPerShard; |
| private final String version; |
| private final CompactionState lastCompactionState; |
| |
| SegmentGenerateSpec(Interval totalInterval, Period segmentPeriod) |
| { |
| this(totalInterval, segmentPeriod, null, null); |
| } |
| |
| SegmentGenerateSpec(Interval totalInterval, Period segmentPeriod, String version, CompactionState lastCompactionState) |
| { |
| this(totalInterval, segmentPeriod, DEFAULT_SEGMENT_SIZE, DEFAULT_NUM_SEGMENTS_PER_SHARD, version, lastCompactionState); |
| } |
| |
| SegmentGenerateSpec(Interval totalInterval, Period segmentPeriod, long segmentSize, int numSegmentsPerShard) |
| { |
| this(totalInterval, segmentPeriod, segmentSize, numSegmentsPerShard, null, null); |
| } |
| |
| SegmentGenerateSpec(Interval totalInterval, Period segmentPeriod, long segmentSize, int numSegmentsPerShard, String version, CompactionState lastCompactionState) |
| { |
| Preconditions.checkArgument(numSegmentsPerShard >= 1); |
| this.totalInterval = totalInterval; |
| this.segmentPeriod = segmentPeriod; |
| this.segmentSize = segmentSize; |
| this.numSegmentsPerShard = numSegmentsPerShard; |
| this.version = version; |
| this.lastCompactionState = lastCompactionState; |
| } |
| } |
| } |