blob: 50c318683e8ffb18100bccd6e3ef7e464a87abf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.concurrent;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TestTaskToolboxFactory;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Contains tests to verify behaviour of concurrently running REPLACE and APPEND
* tasks on the same interval of a datasource.
* <p>
* The tests verify the interleaving of the following actions:
* <ul>
* <li>LOCK: Acquisition of a lock on an interval by a replace task</li>
* <li>ALLOCATE: Allocation of a pending segment by an append task</li>
* <li>REPLACE: Commit of segments created by a replace task</li>
* <li>APPEND: Commit of segments created by an append task</li>
* </ul>
*/
public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
{
/**
* The version used by append jobs when no previous replace job has run on an interval.
*/
private static final String SEGMENT_V0 = DateTimes.EPOCH.toString();
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
private static final String WIKI = "wiki";
private TaskQueue taskQueue;
private TaskActionClientFactory taskActionClientFactory;
private TaskActionClient dummyTaskActionClient;
private final List<ActionsTestTask> runningTasks = new ArrayList<>();
private ActionsTestTask appendTask;
private ActionsTestTask replaceTask;
private final AtomicInteger groupId = new AtomicInteger(0);
private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class);
private Capture<String> supervisorId;
private Capture<SegmentIdWithShardSpec> oldPendingSegment;
private Capture<SegmentIdWithShardSpec> newPendingSegment;
private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
private Map<SegmentId, Object> parentSegmentToLoadSpec;
@Override
@Before
public void setUpIngestionTestBase() throws IOException
{
EasyMock.reset(supervisorManager);
EasyMock.expect(supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(WIKI))
.andReturn(Optional.of(WIKI)).anyTimes();
super.setUpIngestionTestBase();
final TaskConfig taskConfig = new TaskConfigBuilder().build();
taskActionClientFactory = createActionClientFactory();
dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create());
final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10);
TaskRunner taskRunner = new ThreadingTaskRunner(
createToolboxFactory(taskConfig, taskActionClientFactory),
taskConfig,
workerConfig,
new NoopTaskLogs(),
getObjectMapper(),
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
taskActionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper(),
new NoopTaskContextEnricher()
);
runningTasks.clear();
taskQueue.start();
groupId.set(0);
appendTask = createAndStartTask();
supervisorId = Capture.newInstance(CaptureType.ALL);
oldPendingSegment = Capture.newInstance(CaptureType.ALL);
newPendingSegment = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
EasyMock.capture(supervisorId),
EasyMock.capture(oldPendingSegment),
EasyMock.capture(newPendingSegment)
)).andReturn(true).anyTimes();
replaceTask = createAndStartTask();
EasyMock.replay(supervisorManager);
versionToIntervalToLoadSpecs = new HashMap<>();
parentSegmentToLoadSpec = new HashMap<>();
}
@After
public void tearDown()
{
verifyVersionIntervalLoadSpecUniqueness();
for (ActionsTestTask task : runningTasks) {
task.finishRunAndGetStatus();
}
}
@Test
public void testLockReplaceAllocateAppend()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
replaceTask.finishRunAndGetStatus();
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets fully overshadowed
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
}
@Test
public void testLockReplaceMonthAllocateAppendDay()
{
String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that the allocated segment takes the version and interval of previous replace
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(v1, pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceMonthAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceMonthAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the old segment gets completely replaced
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
}
@Test
public void testLockReplaceDayAllocateAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that new segment gets allocated with DAY granularity even though preferred was MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(v1, pendingSegment.getVersion());
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendMonthReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity instead of MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.interval(FIRST_OF_JAN_23)
.version(v1)
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
// Verify that replace lock cannot be acquired on MONTH
TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
// Verify that segment cannot be committed since there is no lock
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0);
final ISE exception = Assert.assertThrows(ISE.class, () -> commitReplaceSegments(segmentV10));
final Throwable throwable = Throwables.getRootCause(exception);
Assert.assertEquals(
StringUtils.format(
"Segments[[%s]] are not covered by locks[[]] for task[%s]",
segmentV10, replaceTask.getId()
),
throwable.getMessage()
);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
}
@Test
public void testAllocateAppendMonthLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
// Verify that replace lock cannot be acquired on DAY as MONTH is already locked
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
}
@Test
public void testLockAllocateDayReplaceMonthAllocateAppend()
{
final SegmentIdWithShardSpec pendingSegmentV0
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(JAN_23, segmentV10);
final SegmentIdWithShardSpec pendingSegmentV1
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV1.getVersion());
final DataSegment segmentV00 = asSegment(pendingSegmentV0);
final DataSegment segmentV11 = asSegment(pendingSegmentV1);
Set<DataSegment> appendSegments = commitAppendSegments(segmentV00, segmentV11)
.getSegments();
Assert.assertEquals(3, appendSegments.size());
// Segment V11 is committed
Assert.assertTrue(appendSegments.remove(segmentV11));
// Segment V00 is also committed
Assert.assertTrue(appendSegments.remove(segmentV00));
// Segment V00 is upgraded to v1 with MONTH granularlity at the time of commit as V12
final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
Assert.assertEquals(v1, segmentV12.getVersion());
Assert.assertEquals(JAN_23, segmentV12.getInterval());
Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, segmentV12);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
}
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
{
for (DataSegment segment : segments) {
if (version.equals(segment.getVersion())
&& Objects.equals(segment.getLoadSpec(), loadSpec)) {
return segment;
}
}
return null;
}
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
return new DataSegment(
id,
Collections.singletonMap(id.toString(), id.toString()),
Collections.emptyList(),
Collections.emptyList(),
pendingSegment.getShardSpec(),
null,
0,
0
);
}
private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments);
}
private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments);
}
private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments)
{
try {
Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(interval),
visibility
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching used segments in interval[%s]", interval);
}
}
private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments)
{
try {
final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
Collections.singletonList(interval)
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval);
}
}
private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
)
{
TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder()
.setConfig(taskConfig)
.setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
.setTaskActionClientFactory(taskActionClientFactory);
return new TestTaskToolboxFactory(builder)
{
@Override
public TaskToolbox build(TaskConfig config, Task task)
{
return createTaskToolbox(config, task, supervisorManager);
}
};
}
private DataSegment createSegment(Interval interval, String version)
{
SegmentId id = SegmentId.of(WIKI, interval, version, null);
return DataSegment.builder()
.dataSource(WIKI)
.interval(interval)
.version(version)
.loadSpec(Collections.singletonMap(id.toString(), id.toString()))
.size(100)
.build();
}
private ActionsTestTask createAndStartTask()
{
ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory);
taskQueue.add(task);
runningTasks.add(task);
return task;
}
private void commitReplaceSegments(DataSegment... dataSegments)
{
replaceTask.commitReplaceSegments(dataSegments);
for (int i = 0; i < supervisorId.getValues().size(); i++) {
announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i));
}
supervisorId.reset();
oldPendingSegment.reset();
newPendingSegment.reset();
replaceTask.finishRunAndGetStatus();
}
private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments)
{
SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments);
result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
for (DataSegment segment : dataSegments) {
parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
}
appendTask.finishRunAndGetStatus();
return result;
}
private void announceUpgradedPendingSegment(
SegmentIdWithShardSpec oldPendingSegment,
SegmentIdWithShardSpec newPendingSegment
)
{
appendTask.getAnnouncedSegmentsToParentSegments()
.put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId());
}
private void unannounceUpgradedPendingSegment(
DataSegment segment
)
{
appendTask.getAnnouncedSegmentsToParentSegments()
.remove(segment.getId());
}
private void verifyVersionIntervalLoadSpecUniqueness()
{
for (DataSegment usedSegment : getAllUsedSegments()) {
final String version = usedSegment.getVersion();
final Interval interval = usedSegment.getInterval();
final Object loadSpec = Iterables.getOnlyElement(usedSegment.getLoadSpec().values());
Map<Interval, Set<Object>> intervalToLoadSpecs
= versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
Set<Object> loadSpecs
= intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
Assert.assertFalse(loadSpecs.contains(loadSpec));
loadSpecs.add(loadSpec);
}
for (Map.Entry<SegmentId, SegmentId> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
final String version = entry.getKey().getVersion();
final Interval interval = entry.getKey().getInterval();
final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());
Map<Interval, Set<Object>> intervalToLoadSpecs
= versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
Set<Object> loadSpecs
= intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
Assert.assertFalse(loadSpecs.contains(loadSpec));
loadSpecs.add(loadSpec);
}
}
private Collection<DataSegment> getAllUsedSegments()
{
try {
return dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(Intervals.ETERNITY),
Segments.INCLUDING_OVERSHADOWED
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}