Associate pending segments with the tasks that requested them (#16144)
Changes:
- Add column `task_allocator_id` to `pendingSegments` metadata table.
- Add column `upgraded_from_segment_id` to `pendingSegments` metadata table.
- Add interface `PendingSegmentAllocatingTask` and implement it by all tasks which
can allocate pending segments.
- Use `taskAllocatorId` to identify the task (and its sub-tasks or replicas) to which
a pending segment has been allocated.
- Perform active cleanup of pending segments in `TaskLockbox` once there are no
active tasks for the corresponding task allocator id.
- When committing APPEND segments, also commit all upgraded pending segments
corresponding to that task allocator id.
- When committing REPLACE segments, upgrade all overlapping pending segments in
the same transaction.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 64cecdf..7bb57c5 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -40,6 +40,7 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -71,7 +72,7 @@
import java.util.Set;
@JsonTypeName(MSQControllerTask.TYPE)
-public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
+public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, PendingSegmentAllocatingTask
{
public static final String TYPE = "query_controller";
public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select";
@@ -157,6 +158,12 @@
return ImmutableSet.of();
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getId();
+ }
+
@JsonProperty("spec")
public MSQSpec getQuerySpec()
{
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index 95abc6b..c049484 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -32,6 +32,7 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
@@ -45,7 +46,7 @@
import java.util.Set;
@JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask
+public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
{
public static final String TYPE = "query_worker";
@@ -125,6 +126,12 @@
return ImmutableSet.of();
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getControllerTaskId();
+ }
+
@Override
public boolean isReady(final TaskActionClient taskActionClient)
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 6aaf21e..309eb38 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -35,7 +35,7 @@
public class MSQControllerTaskTest
{
- MSQSpec MSQ_SPEC = MSQSpec
+ private final MSQSpec MSQ_SPEC = MSQSpec
.builder()
.destination(new DataSourceMSQDestination(
"target",
@@ -59,7 +59,7 @@
@Test
public void testGetInputSourceResources()
{
- MSQControllerTask msqWorkerTask = new MSQControllerTask(
+ MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
@@ -67,7 +67,25 @@
null,
null,
null,
- null);
- Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
+ null
+ );
+ Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
+ }
+
+ @Test
+ public void testGetTaskAllocatorId()
+ {
+ final String taskId = "taskId";
+ MSQControllerTask controllerTask = new MSQControllerTask(
+ taskId,
+ MSQ_SPEC,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
}
}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 37c31ba..482d67d 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -47,7 +47,6 @@
@Test
public void testEquals()
{
- Assert.assertNotEquals(msqWorkerTask, 0);
Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
@@ -110,4 +109,11 @@
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
+ @Test
+ public void testGetTaskAllocatorId()
+ {
+ MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
+ Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
+ }
+
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index 280f419..d030851 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -23,8 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
@@ -210,6 +212,12 @@
final TaskActionToolbox toolbox
)
{
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ throw DruidException.defensive(
+ "Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.",
+ task.getId(), task.getType()
+ );
+ }
int attempt = 0;
while (true) {
attempt++;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 67b7017..1a1e6c7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -22,10 +22,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -41,8 +43,20 @@
import java.util.stream.Collectors;
/**
+ *
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval):
+ * For an append lock held over an interval:
+ * transaction {
+ * commit input segments contained within interval
+ * if there is an active replace lock over the interval:
+ * add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
+ * fetch pending segments with parent contained within the input segments, and commit them
+ * }
+ * </pre>
*/
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
@@ -114,6 +128,13 @@
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ throw DruidException.defensive(
+ "Task[%s] of type[%s] cannot append segments as it does not implement PendingSegmentAllocatingTask.",
+ task.getId(),
+ task.getType()
+ );
+ }
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
@@ -132,17 +153,20 @@
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
final CriticalAction.Action<SegmentPublishResult> publishAction;
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
- segmentToReplaceLock
+ segmentToReplaceLock,
+ taskAllocatorId
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
- endMetadata
+ endMetadata,
+ taskAllocatorId
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index aaa62db..2f4a580 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -30,11 +30,14 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,6 +45,20 @@
/**
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval)
+ *- For a replace lock held over an interval:
+ * transaction {
+ * commit input segments contained within interval
+ * upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
+ * fetch payload, task_allocator_id for pending segments
+ * upgrade each such pending segment to the replace lock's version with the corresponding root segment
+ * }
+ * For every pending segment with version == replace lock version:
+ * Fetch payload, group_id or the pending segment and relay them to the supervisor
+ * The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries
+ * </pre>
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
@@ -123,7 +140,7 @@
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
- tryUpgradeOverlappingPendingSegments(task, toolbox);
+ registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
@@ -134,34 +151,55 @@
}
/**
- * Tries to upgrade any pending segments that overlap with the committed segments.
+ * Registers upgraded pending segments on the active supervisor, if any
*/
- private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
+ private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}
- final Set<String> activeRealtimeSequencePrefixes
- = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
- Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
- toolbox.getIndexerMetadataStorageCoordinator()
- .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
- log.info(
- "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
- upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
- );
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
- upgradedPendingSegments.forEach(
- (oldId, newId) -> toolbox.getSupervisorManager()
- .registerNewVersionOfPendingSegmentOnSupervisor(
- activeSupervisorIdWithAppendLock.get(),
- oldId,
- newId
- )
+
+ Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ pendingSegments.addAll(
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .getPendingSegments(task.getDataSource(), replaceLock.getInterval())
+ );
+ }
+ Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+ pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+ pendingSegment.getId().asSegmentId().toString(),
+ pendingSegment.getId()
+ ));
+ Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
+ pendingSegments.forEach(pendingSegment -> {
+ if (pendingSegment.getUpgradedFromSegmentId() != null
+ && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
+ segmentToParent.put(
+ pendingSegment.getId(),
+ idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
+ );
+ }
+ });
+
+ segmentToParent.forEach(
+ (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+ activeSupervisorIdWithAppendLock.get(),
+ oldId,
+ newId
+ )
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1c581cd..4275926 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -119,7 +119,8 @@
import java.util.concurrent.TimeoutException;
@Deprecated
-public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler
+public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
+ implements ChatHandler, PendingSegmentAllocatingTask
{
private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
@@ -260,6 +261,12 @@
}
@Override
+ public String getTaskAllocatorId()
+ {
+ return getGroupId();
+ }
+
+ @Override
public TaskStatus runTask(final TaskToolbox toolbox)
{
runThread = Thread.currentThread();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index e5df64c..f5aec08 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -127,7 +127,7 @@
* serialization fields of this class must correspond to those of {@link
* ClientCompactionTaskQuery}.
*/
-public class CompactionTask extends AbstractBatchIndexTask
+public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();
@@ -400,6 +400,12 @@
return TYPE;
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getGroupId();
+ }
+
@Nonnull
@JsonIgnore
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 329bc0d..1796f6e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -137,7 +137,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
+public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
@@ -302,6 +302,12 @@
}
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getGroupId();
+ }
+
@Nonnull
@JsonIgnore
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 678d8ca..9d91542 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -39,7 +39,7 @@
/**
*/
-public class NoopTask extends AbstractTask
+public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask
{
private static final int DEFAULT_RUN_TIME = 2500;
@@ -111,6 +111,12 @@
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getId();
+ }
+
public static NoopTask create()
{
return forDatasource(null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java
new file mode 100644
index 0000000..e392adc
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+/**
+ * An interface to be implemented by every appending task that allocates pending segments.
+ */
+public interface PendingSegmentAllocatingTask
+{
+ /**
+ * Unique string used by an appending task (or its sub-tasks and replicas) to allocate pending segments
+ * and identify pending segments allocated to it.
+ */
+ String getTaskAllocatorId();
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 5d72f4a..935adb3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -52,6 +52,7 @@
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@@ -131,7 +132,8 @@
*
* @see ParallelIndexTaskRunner
*/
-public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler
+public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
+ implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "index_parallel";
@@ -476,6 +478,12 @@
);
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getGroupId();
+ }
+
@Nullable
@Override
public Granularity getSegmentGranularity()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 0b4f62e..e02d599 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -43,6 +43,7 @@
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -105,7 +106,7 @@
* generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
-public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler
+public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
@@ -237,6 +238,12 @@
}
@Override
+ public String getTaskAllocatorId()
+ {
+ return getGroupId();
+ }
+
+ @Override
public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{
try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 54e2919..35ec79d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
@@ -38,6 +39,7 @@
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
@@ -99,10 +101,20 @@
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
- // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
- // this set should be accessed under the giant lock.
+ /**
+ * Set of active tasks. Locks can be granted only to a task present in this set.
+ * Should be accessed only under the giant lock.
+ */
private final Set<String> activeTasks = new HashSet<>();
+ /**
+ * Map from a taskAllocatorId to the set of active taskIds using that allocator id.
+ * Used to clean up pending segments for a taskAllocatorId as soon as the set
+ * of corresponding active taskIds becomes empty.
+ */
+ @GuardedBy("giant")
+ private final Map<String, Set<String>> activeAllocatorIdToTaskIds = new HashMap<>();
+
@Inject
public TaskLockbox(
TaskStorage taskStorage,
@@ -213,6 +225,12 @@
activeTasks.remove(task.getId());
}
}
+ activeAllocatorIdToTaskIds.clear();
+ for (Task task : storedActiveTasks) {
+ if (activeTasks.contains(task.getId())) {
+ trackAppendingTask(task);
+ }
+ }
log.info(
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
@@ -387,7 +405,7 @@
if (request instanceof LockRequestForNewSegment) {
final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request;
if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) {
- newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion());
+ newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null);
if (newSegmentId == null) {
return LockResult.fail();
}
@@ -411,7 +429,12 @@
newSegmentId
);
}
- newSegmentId = allocateSegmentId(lockRequestForNewSegment, posseToUse.getTaskLock().getVersion());
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+ newSegmentId = allocateSegmentId(
+ lockRequestForNewSegment,
+ posseToUse.getTaskLock().getVersion(),
+ taskAllocatorId
+ );
}
}
return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
@@ -514,6 +537,7 @@
}
}
+ @Nullable
private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
@@ -710,7 +734,7 @@
}
}
- private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version)
+ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version, String allocatorId)
{
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
@@ -719,7 +743,8 @@
request.getInterval(),
request.getPartialShardSpec(),
version,
- request.isSkipSegmentLineageCheck()
+ request.isSkipSegmentLineageCheck(),
+ allocatorId
);
}
@@ -1159,12 +1184,25 @@
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
+ trackAppendingTask(task);
}
finally {
giant.unlock();
}
}
+ @GuardedBy("giant")
+ private void trackAppendingTask(Task task)
+ {
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+ if (taskAllocatorId != null) {
+ activeAllocatorIdToTaskIds.computeIfAbsent(taskAllocatorId, s -> new HashSet<>())
+ .add(task.getId());
+ }
+ }
+ }
+
/**
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
@@ -1176,13 +1214,35 @@
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
- if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
- final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
- log.info(
- "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
- upgradeSegmentsDeleted,
- task.getId()
- );
+ try {
+ // Clean upgrade segments table for entries associated with replacing task
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
+ final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+ log.info(
+ "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
+ upgradeSegmentsDeleted, task.getId()
+ );
+ }
+ // Clean pending segments associated with the appending task
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+ if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+ final Set<String> idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId);
+ idsInSameGroup.remove(task.getId());
+ if (idsInSameGroup.isEmpty()) {
+ final int pendingSegmentsDeleted
+ = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
+ log.info(
+ "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
+ pendingSegmentsDeleted, taskAllocatorId
+ );
+ }
+ activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
}
unlockAll(task);
}
@@ -1771,7 +1831,9 @@
action.getSequenceName(),
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
- action.getPartialShardSpec()
+ action.getPartialShardSpec(),
+ null,
+ ((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 810a991..dd57b56 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -39,7 +39,6 @@
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -126,15 +125,6 @@
return Optional.absent();
}
- public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
- {
- if (supervisors.containsKey(activeSupervisorId)) {
- return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
- } else {
- return Collections.emptySet();
- }
- }
-
public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
@@ -340,7 +330,7 @@
return true;
}
catch (Exception e) {
- log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed",
+ log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId);
}
return false;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 5450901..0ec9a67 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -37,6 +37,7 @@
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -61,7 +62,7 @@
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
- extends AbstractTask implements ChatHandler
+ extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
@@ -269,6 +270,12 @@
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
+ @Override
+ public String getTaskAllocatorId()
+ {
+ return getTaskResource().getAvailabilityGroup();
+ }
+
protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType> createTaskRunner();
/**
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 032142b..44f4ee1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -60,6 +60,7 @@
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
@@ -118,6 +119,7 @@
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
+ private SupervisorManager supervisorManager;
protected File reportsFile;
@Before
@@ -227,13 +229,14 @@
taskStorage,
storageCoordinator,
new NoopServiceEmitter(),
- null,
+ supervisorManager,
objectMapper
);
}
- public TaskToolbox createTaskToolbox(TaskConfig config, Task task)
+ public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorManager supervisorManager)
{
+ this.supervisorManager = supervisorManager;
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index 69a8b6c..230cfa4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -36,10 +36,13 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -49,6 +52,7 @@
{
private final TaskActionClient client;
private final AtomicInteger sequenceId = new AtomicInteger(0);
+ private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = new HashMap<>();
public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory)
{
@@ -78,16 +82,25 @@
);
}
+ public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()
+ {
+ return announcedSegmentsToParentSegments;
+ }
+
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
- return runAction(
+ SegmentPublishResult publishResult = runAction(
SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
);
+ for (DataSegment segment : publishResult.getSegments()) {
+ announcedSegmentsToParentSegments.remove(segment.getId());
+ }
+ return publishResult;
}
public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Granularity preferredSegmentGranularity)
{
- return runAction(
+ SegmentIdWithShardSpec pendingSegment = runAction(
new SegmentAllocateAction(
getDataSource(),
timestamp,
@@ -101,28 +114,8 @@
TaskLockType.APPEND
)
);
- }
-
- public SegmentIdWithShardSpec allocateSegmentForTimestamp(
- DateTime timestamp,
- Granularity preferredSegmentGranularity,
- String sequenceName
- )
- {
- return runAction(
- new SegmentAllocateAction(
- getDataSource(),
- timestamp,
- Granularities.SECOND,
- preferredSegmentGranularity,
- getId() + "__" + sequenceName,
- null,
- false,
- NumberedPartialShardSpec.instance(),
- LockGranularity.TIME_CHUNK,
- TaskLockType.APPEND
- )
- );
+ announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId());
+ return pendingSegment;
}
private <T> T runAction(TaskAction<T> action)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
index 08e2c18..5945d14 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
@@ -24,6 +24,7 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -40,7 +41,7 @@
/**
* Test task that can be given a series of commands to execute in its {@link #runTask} method.
*/
-public class CommandQueueTask extends AbstractTask
+public class CommandQueueTask extends AbstractTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CommandQueueTask.class);
@@ -141,6 +142,12 @@
}
@Override
+ public String getTaskAllocatorId()
+ {
+ return getId();
+ }
+
+ @Override
public TaskStatus runTask(TaskToolbox taskToolbox)
{
TaskStatus status;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index cc498c7..415c63a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -728,6 +728,7 @@
// Append segment for Oct-Dec
final DataSegment segmentV02 = asSegment(pendingSegment02);
appendTask2.commitAppendSegments(segmentV02);
+ appendTask2.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV02);
@@ -747,12 +748,14 @@
// Append segment for Jan 1st
final DataSegment segmentV01 = asSegment(pendingSegment01);
appendTask.commitAppendSegments(segmentV01);
+ appendTask.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01, segmentV02);
// Replace segment for whole year
final DataSegment segmentV10 = createSegment(YEAR_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
+ replaceTask.finishRunAndGetStatus();
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
@@ -767,6 +770,7 @@
// Append segment for quarter
final DataSegment segmentV03 = asSegment(pendingSegment03);
appendTask3.commitAppendSegments(segmentV03);
+ appendTask3.finishRunAndGetStatus();
final DataSegment segmentV13 = DataSegment.builder(segmentV03)
.version(v1)
@@ -1021,7 +1025,7 @@
@Override
public TaskToolbox build(TaskConfig config, Task task)
{
- return createTaskToolbox(config, task);
+ return createTaskToolbox(config, task, null);
}
};
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
new file mode 100644
index 0000000..50c3186
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -0,0 +1,881 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.concurrent;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskStorageDirTracker;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.IngestionTestBase;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TestTaskToolboxFactory;
+import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
+import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Contains tests to verify behaviour of concurrently running REPLACE and APPEND
+ * tasks on the same interval of a datasource.
+ * <p>
+ * The tests verify the interleaving of the following actions:
+ * <ul>
+ * <li>LOCK: Acquisition of a lock on an interval by a replace task</li>
+ * <li>ALLOCATE: Allocation of a pending segment by an append task</li>
+ * <li>REPLACE: Commit of segments created by a replace task</li>
+ * <li>APPEND: Commit of segments created by an append task</li>
+ * </ul>
+ */
+public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
+{
+ /**
+ * The version used by append jobs when no previous replace job has run on an interval.
+ */
+ private static final String SEGMENT_V0 = DateTimes.EPOCH.toString();
+
+ private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
+ private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
+
+ private static final String WIKI = "wiki";
+
+ private TaskQueue taskQueue;
+ private TaskActionClientFactory taskActionClientFactory;
+ private TaskActionClient dummyTaskActionClient;
+ private final List<ActionsTestTask> runningTasks = new ArrayList<>();
+
+ private ActionsTestTask appendTask;
+ private ActionsTestTask replaceTask;
+
+ private final AtomicInteger groupId = new AtomicInteger(0);
+ private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class);
+ private Capture<String> supervisorId;
+ private Capture<SegmentIdWithShardSpec> oldPendingSegment;
+ private Capture<SegmentIdWithShardSpec> newPendingSegment;
+ private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
+ private Map<SegmentId, Object> parentSegmentToLoadSpec;
+
+ @Override
+ @Before
+ public void setUpIngestionTestBase() throws IOException
+ {
+ EasyMock.reset(supervisorManager);
+ EasyMock.expect(supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(WIKI))
+ .andReturn(Optional.of(WIKI)).anyTimes();
+ super.setUpIngestionTestBase();
+ final TaskConfig taskConfig = new TaskConfigBuilder().build();
+ taskActionClientFactory = createActionClientFactory();
+ dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create());
+
+ final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10);
+ TaskRunner taskRunner = new ThreadingTaskRunner(
+ createToolboxFactory(taskConfig, taskActionClientFactory),
+ taskConfig,
+ workerConfig,
+ new NoopTaskLogs(),
+ getObjectMapper(),
+ new TestAppenderatorsManager(),
+ new MultipleFileTaskReportFileWriter(),
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+ );
+ taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, new Period(0L), null, null, null),
+ new DefaultTaskConfig(),
+ getTaskStorage(),
+ taskRunner,
+ taskActionClientFactory,
+ getLockbox(),
+ new NoopServiceEmitter(),
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
+ );
+ runningTasks.clear();
+ taskQueue.start();
+
+ groupId.set(0);
+ appendTask = createAndStartTask();
+ supervisorId = Capture.newInstance(CaptureType.ALL);
+ oldPendingSegment = Capture.newInstance(CaptureType.ALL);
+ newPendingSegment = Capture.newInstance(CaptureType.ALL);
+ EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+ EasyMock.capture(supervisorId),
+ EasyMock.capture(oldPendingSegment),
+ EasyMock.capture(newPendingSegment)
+ )).andReturn(true).anyTimes();
+ replaceTask = createAndStartTask();
+ EasyMock.replay(supervisorManager);
+ versionToIntervalToLoadSpecs = new HashMap<>();
+ parentSegmentToLoadSpec = new HashMap<>();
+ }
+
+ @After
+ public void tearDown()
+ {
+ verifyVersionIntervalLoadSpecUniqueness();
+ for (ActionsTestTask task : runningTasks) {
+ task.finishRunAndGetStatus();
+ }
+ }
+
+ @Test
+ public void testLockReplaceAllocateAppend()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
+
+ final DataSegment segmentV11 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV11);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateAppendDayReplaceDay()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that the segment appended to v0 gets upgraded to v1
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .version(v1).build();
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateReplaceDayAppendDay()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ // Verify that the segment appended to v0 gets upgraded to v1
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .version(v1).build();
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateLockReplaceDayAppendDay()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ // Verify that the segment appended to v0 gets upgraded to v1
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .version(v1).build();
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateLockAppendDayReplaceDay()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+ replaceTask.finishRunAndGetStatus();
+
+ // Verify that the segment appended to v0 gets upgraded to v1
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .version(v1).build();
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateAppendDayLockReplaceDay()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that the segment appended to v0 gets fully overshadowed
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+ }
+
+ @Test
+ public void testLockReplaceMonthAllocateAppendDay()
+ {
+ String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ // Verify that the allocated segment takes the version and interval of previous replace
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(v1, pendingSegment.getVersion());
+
+ final DataSegment segmentV11 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV11);
+
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateAppendDayReplaceMonth()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that append segment gets upgraded to replace version
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .version(v1)
+ .interval(segmentV10.getInterval())
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateReplaceMonthAppendDay()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ // Verify that append segment gets upgraded to replace version
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .version(v1)
+ .interval(segmentV10.getInterval())
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateLockReplaceMonthAppendDay()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ // Verify that append segment gets upgraded to replace version
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .version(v1)
+ .interval(segmentV10.getInterval())
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateLockAppendDayReplaceMonth()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that append segment gets upgraded to replace version
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .version(v1)
+ .interval(segmentV10.getInterval())
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateAppendDayLockReplaceMonth()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that the old segment gets completely replaced
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
+ }
+
+ @Test
+ public void testLockReplaceDayAllocateAppendMonth()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ replaceTask.commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ // Verify that an APPEND lock cannot be acquired on month
+ TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+ Assert.assertNull(appendLock);
+
+ // Verify that new segment gets allocated with DAY granularity even though preferred was MONTH
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+ Assert.assertEquals(v1, pendingSegment.getVersion());
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+
+ final DataSegment segmentV11 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV11);
+
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateAppendMonthReplaceDay()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ // Verify that an APPEND lock cannot be acquired on month
+ TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+ Assert.assertNull(appendLock);
+
+ // Verify that the segment is allocated for DAY granularity
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ // Verify that append segment gets upgraded to replace version
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .version(v1)
+ .interval(segmentV10.getInterval())
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testLockAllocateReplaceDayAppendMonth()
+ {
+ final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+ // Verify that an APPEND lock cannot be acquired on month
+ TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+ Assert.assertNull(appendLock);
+
+ // Verify that the segment is allocated for DAY granularity instead of MONTH
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+ Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+
+ final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+ .interval(FIRST_OF_JAN_23)
+ .version(v1)
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .build();
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+ }
+
+ @Test
+ public void testAllocateLockReplaceDayAppendMonth()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+ Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ // Verify that replace lock cannot be acquired on MONTH
+ TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+ Assert.assertNull(replaceLock);
+
+ // Verify that segment cannot be committed since there is no lock
+ final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0);
+ final ISE exception = Assert.assertThrows(ISE.class, () -> commitReplaceSegments(segmentV10));
+ final Throwable throwable = Throwables.getRootCause(exception);
+ Assert.assertEquals(
+ StringUtils.format(
+ "Segments[[%s]] are not covered by locks[[]] for task[%s]",
+ segmentV10, replaceTask.getId()
+ ),
+ throwable.getMessage()
+ );
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ commitAppendSegments(segmentV01);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
+ }
+
+ @Test
+ public void testAllocateAppendMonthLockReplaceDay()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+ Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
+
+ // Verify that replace lock cannot be acquired on DAY as MONTH is already locked
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+ Assert.assertNull(replaceLock);
+ }
+
+ @Test
+ public void testLockAllocateDayReplaceMonthAllocateAppend()
+ {
+ final SegmentIdWithShardSpec pendingSegmentV0
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+
+ final SegmentIdWithShardSpec pendingSegmentV1
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+ Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV1.getVersion());
+
+ final DataSegment segmentV00 = asSegment(pendingSegmentV0);
+ final DataSegment segmentV11 = asSegment(pendingSegmentV1);
+ Set<DataSegment> appendSegments = commitAppendSegments(segmentV00, segmentV11)
+ .getSegments();
+
+ Assert.assertEquals(3, appendSegments.size());
+ // Segment V11 is committed
+ Assert.assertTrue(appendSegments.remove(segmentV11));
+ // Segment V00 is also committed
+ Assert.assertTrue(appendSegments.remove(segmentV00));
+ // Segment V00 is upgraded to v1 with MONTH granularlity at the time of commit as V12
+ final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
+ Assert.assertEquals(v1, segmentV12.getVersion());
+ Assert.assertEquals(JAN_23, segmentV12.getInterval());
+ Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
+
+ verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, segmentV12);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
+ }
+
+
+ @Nullable
+ private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
+ {
+ for (DataSegment segment : segments) {
+ if (version.equals(segment.getVersion())
+ && Objects.equals(segment.getLoadSpec(), loadSpec)) {
+ return segment;
+ }
+ }
+
+ return null;
+ }
+
+ private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
+ {
+ final SegmentId id = pendingSegment.asSegmentId();
+ return new DataSegment(
+ id,
+ Collections.singletonMap(id.toString(), id.toString()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pendingSegment.getShardSpec(),
+ null,
+ 0,
+ 0
+ );
+ }
+
+ private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments)
+ {
+ verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments);
+ }
+
+ private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments)
+ {
+ verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments);
+ }
+
+ private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments)
+ {
+ try {
+ Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
+ new RetrieveUsedSegmentsAction(
+ WIKI,
+ null,
+ ImmutableList.of(interval),
+ visibility
+ )
+ );
+ Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Error while fetching used segments in interval[%s]", interval);
+ }
+ }
+
+ private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments)
+ {
+ try {
+ final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
+ Collection<DataSegment> allUsedSegments = taskActionClient.submit(
+ new RetrieveUsedSegmentsAction(
+ WIKI,
+ Collections.singletonList(interval)
+ )
+ );
+ Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval);
+ }
+ }
+
+ private TaskToolboxFactory createToolboxFactory(
+ TaskConfig taskConfig,
+ TaskActionClientFactory taskActionClientFactory
+ )
+ {
+ TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder()
+ .setConfig(taskConfig)
+ .setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
+ .setTaskActionClientFactory(taskActionClientFactory);
+ return new TestTaskToolboxFactory(builder)
+ {
+ @Override
+ public TaskToolbox build(TaskConfig config, Task task)
+ {
+ return createTaskToolbox(config, task, supervisorManager);
+ }
+ };
+ }
+
+ private DataSegment createSegment(Interval interval, String version)
+ {
+ SegmentId id = SegmentId.of(WIKI, interval, version, null);
+ return DataSegment.builder()
+ .dataSource(WIKI)
+ .interval(interval)
+ .version(version)
+ .loadSpec(Collections.singletonMap(id.toString(), id.toString()))
+ .size(100)
+ .build();
+ }
+
+ private ActionsTestTask createAndStartTask()
+ {
+ ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory);
+ taskQueue.add(task);
+ runningTasks.add(task);
+ return task;
+ }
+
+ private void commitReplaceSegments(DataSegment... dataSegments)
+ {
+ replaceTask.commitReplaceSegments(dataSegments);
+ for (int i = 0; i < supervisorId.getValues().size(); i++) {
+ announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i));
+ }
+ supervisorId.reset();
+ oldPendingSegment.reset();
+ newPendingSegment.reset();
+ replaceTask.finishRunAndGetStatus();
+ }
+
+ private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments)
+ {
+ SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments);
+ result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
+ for (DataSegment segment : dataSegments) {
+ parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
+ }
+ appendTask.finishRunAndGetStatus();
+ return result;
+ }
+
+ private void announceUpgradedPendingSegment(
+ SegmentIdWithShardSpec oldPendingSegment,
+ SegmentIdWithShardSpec newPendingSegment
+ )
+ {
+ appendTask.getAnnouncedSegmentsToParentSegments()
+ .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId());
+ }
+
+ private void unannounceUpgradedPendingSegment(
+ DataSegment segment
+ )
+ {
+ appendTask.getAnnouncedSegmentsToParentSegments()
+ .remove(segment.getId());
+ }
+
+ private void verifyVersionIntervalLoadSpecUniqueness()
+ {
+ for (DataSegment usedSegment : getAllUsedSegments()) {
+ final String version = usedSegment.getVersion();
+ final Interval interval = usedSegment.getInterval();
+ final Object loadSpec = Iterables.getOnlyElement(usedSegment.getLoadSpec().values());
+ Map<Interval, Set<Object>> intervalToLoadSpecs
+ = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
+ Set<Object> loadSpecs
+ = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
+ Assert.assertFalse(loadSpecs.contains(loadSpec));
+ loadSpecs.add(loadSpec);
+ }
+
+ for (Map.Entry<SegmentId, SegmentId> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
+ final String version = entry.getKey().getVersion();
+ final Interval interval = entry.getKey().getInterval();
+ final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());
+ Map<Interval, Set<Object>> intervalToLoadSpecs
+ = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
+ Set<Object> loadSpecs
+ = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
+ Assert.assertFalse(loadSpecs.contains(loadSpec));
+ loadSpecs.add(loadSpec);
+ }
+ }
+
+ private Collection<DataSegment> getAllUsedSegments()
+ {
+ try {
+ return dummyTaskActionClient.submit(
+ new RetrieveUsedSegmentsAction(
+ WIKI,
+ null,
+ ImmutableList.of(Intervals.ETERNITY),
+ Segments.INCLUDING_OVERSHADOWED
+ )
+ );
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 307879f..999d4d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1896,14 +1896,17 @@
}
@Test
- public void testUpgradeSegmentsCleanupOnUnlock()
+ public void testCleanupOnUnlock()
{
- final Task replaceTask = NoopTask.create();
- final Task appendTask = NoopTask.create();
+ final Task replaceTask = NoopTask.forDatasource("replace");
+ final Task appendTask = NoopTask.forDatasource("append");
final IndexerSQLMetadataStorageCoordinator coordinator
= EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
+ // Any task may attempt pending segment clean up
+ EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once();
+ EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once();
EasyMock.replay(coordinator);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 9ad3be6..f57494a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@@ -175,7 +176,8 @@
@Override
public SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
- Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ String taskGroup
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
@@ -186,7 +188,8 @@
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
- DataSourceMetadata endMetadata
+ DataSourceMetadata endMetadata,
+ String taskGroup
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
@@ -228,7 +231,8 @@
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
- boolean skipSegmentLineageCheck
+ boolean skipSegmentLineageCheck,
+ String taskAllocatorId
)
{
return new SegmentIdWithShardSpec(
@@ -241,8 +245,7 @@
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
- Set<DataSegment> replaceSegments,
- Set<String> activeBaseSequenceNames
+ Set<DataSegment> replaceSegments
)
{
return Collections.emptyMap();
@@ -285,6 +288,18 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public int deletePendingSegmentsForTaskGroup(final String taskGroup)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index a889605..2390e7b 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@@ -238,7 +239,7 @@
* identifier may have a version lower than this one, but will not have one higher.
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
- *
+ * @param taskAllocatorId The task allocator id with which the pending segment is associated
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdWithShardSpec allocatePendingSegment(
@@ -248,7 +249,8 @@
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
- boolean skipSegmentLineageCheck
+ boolean skipSegmentLineageCheck,
+ String taskAllocatorId
);
/**
@@ -322,10 +324,12 @@
* must be committed in a single transaction.
* @param appendSegmentToReplaceLock Map from append segment to the currently
* active REPLACE lock (if any) covering it
+ * @param taskAllocatorId allocator id of the task committing the segments to be appended
*/
SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
- Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ String taskAllocatorId
);
/**
@@ -340,7 +344,8 @@
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
- DataSourceMetadata endMetadata
+ DataSourceMetadata endMetadata,
+ String taskGroup
);
/**
@@ -373,13 +378,10 @@
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
- * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups
- * of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
- Set<DataSegment> replaceSegments,
- Set<String> activeRealtimeSequencePrefixes
+ Set<DataSegment> replaceSegments
);
/**
@@ -476,4 +478,19 @@
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
+
+ /**
+ * Delete pending segment for a give task group after all the tasks belonging to it have completed.
+ * @param taskAllocatorId task id / task group / replica group for an appending task
+ * @return number of pending segments deleted from the metadata store
+ */
+ int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
+
+ /**
+ * Fetches all the pending segments of the datasource that overlap with a given interval.
+ * @param datasource datasource to be queried
+ * @param interval interval with which segments overlap
+ * @return List of pending segment records
+ */
+ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval);
}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
index b43e46d..49b31e5 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
@@ -38,18 +38,24 @@
private final String sequenceName;
private final String previousSegmentId;
private final PartialShardSpec partialShardSpec;
+ private final String upgradedFromSegmentId;
+ private final String taskAllocatorId;
public SegmentCreateRequest(
String sequenceName,
String previousSegmentId,
String version,
- PartialShardSpec partialShardSpec
+ PartialShardSpec partialShardSpec,
+ String upgradedFromSegmentId,
+ String taskAllocatorId
)
{
this.sequenceName = sequenceName;
this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId;
this.version = version;
this.partialShardSpec = partialShardSpec;
+ this.upgradedFromSegmentId = upgradedFromSegmentId;
+ this.taskAllocatorId = taskAllocatorId;
}
public String getSequenceName()
@@ -75,4 +81,14 @@
{
return partialShardSpec;
}
+
+ public String getUpgradedFromSegmentId()
+ {
+ return upgradedFromSegmentId;
+ }
+
+ public String getTaskAllocatorId()
+ {
+ return taskAllocatorId;
+ }
}
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index d364299..e7567ba 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -57,7 +57,6 @@
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -94,7 +93,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -281,99 +279,74 @@
}
/**
- * Fetches all the pending segments, whose interval overlaps with the given
- * search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter
- * from the metadata store. Returns a Map from the pending segment ID to the sequence name.
+ * Fetches all the pending segments, whose interval overlaps with the given search interval, from the metadata store.
*/
@VisibleForTesting
- Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
- final Handle handle,
- final String dataSource,
- final Interval interval,
- final Set<String> sequenceNamePrefixFilter
- ) throws IOException
- {
- if (sequenceNamePrefixFilter.isEmpty()) {
- return Collections.emptyMap();
- }
-
- final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter);
- final List<String> sequenceNamePrefixConditions = new ArrayList<>();
- for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
- sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i));
- }
-
- String sql = "SELECT sequence_name, payload"
- + " FROM " + dbTables.getPendingSegmentsTable()
- + " WHERE dataSource = :dataSource"
- + " AND start < :end"
- + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())
- + " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )";
-
- Query<Map<String, Object>> query = handle.createQuery(sql)
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString());
-
- for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
- query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%");
- }
-
- final ResultIterator<PendingSegmentsRecord> dbSegments =
- query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
- .iterator();
-
- final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
- while (dbSegments.hasNext()) {
- PendingSegmentsRecord record = dbSegments.next();
- final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
- if (interval.overlaps(identifier.getInterval())) {
- pendingSegmentToSequenceName.put(identifier, record.sequenceName);
- }
- }
-
- dbSegments.close();
-
- return pendingSegmentToSequenceName;
- }
-
- private Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
- ) throws IOException
+ )
{
- final ResultIterator<PendingSegmentsRecord> dbSegments =
- handle.createQuery(
- StringUtils.format(
- // This query might fail if the year has a different number of digits
- // See https://github.com/apache/druid/pull/11582 for a similar issue
- // Using long for these timestamps instead of varchar would give correct time comparisons
- "SELECT sequence_name, payload FROM %1$s"
- + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start",
- dbTables.getPendingSegmentsTable(), connector.getQuoteString()
- )
- )
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
- .iterator();
+ final boolean compareIntervalEndpointsAsStrings = Intervals.canCompareEndpointsAsStrings(interval);
- final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
- while (dbSegments.hasNext()) {
- PendingSegmentsRecord record = dbSegments.next();
- final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
- if (interval.overlaps(identifier.getInterval())) {
- pendingSegmentToSequenceName.put(identifier, record.sequenceName);
- }
+ String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+ + " FROM " + dbTables.getPendingSegmentsTable()
+ + " WHERE dataSource = :dataSource";
+ if (compareIntervalEndpointsAsStrings) {
+ sql = sql
+ + " AND start < :end"
+ + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString());
}
- dbSegments.close();
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource", dataSource);
+ if (compareIntervalEndpointsAsStrings) {
+ query = query.bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+ }
- return pendingSegmentToSequenceName;
+
+ final ResultIterator<PendingSegmentRecord> pendingSegmentIterator =
+ query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
+ .iterator();
+ final ImmutableList.Builder<PendingSegmentRecord> pendingSegments = ImmutableList.builder();
+ while (pendingSegmentIterator.hasNext()) {
+ final PendingSegmentRecord pendingSegment = pendingSegmentIterator.next();
+ if (compareIntervalEndpointsAsStrings || pendingSegment.getId().getInterval().overlaps(interval)) {
+ pendingSegments.add(pendingSegment);
+ }
+ }
+ pendingSegmentIterator.close();
+ return pendingSegments.build();
+ }
+
+ List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final String taskAllocatorId
+ )
+ {
+ String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+ + " FROM " + dbTables.getPendingSegmentsTable()
+ + " WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id";
+
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource", dataSource)
+ .bind("task_allocator_id", taskAllocatorId);
+
+ final ResultIterator<PendingSegmentRecord> pendingSegmentRecords =
+ query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
+ .iterator();
+
+ final List<PendingSegmentRecord> pendingSegments = new ArrayList<>();
+ while (pendingSegmentRecords.hasNext()) {
+ pendingSegments.add(pendingSegmentRecords.next());
+ }
+
+ pendingSegmentRecords.close();
+
+ return pendingSegments;
}
private SegmentTimeline getTimelineForIntervalsWithHandle(
@@ -503,9 +476,11 @@
segmentsToInsert.addAll(
createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask)
);
- return SegmentPublishResult.ok(
+ SegmentPublishResult result = SegmentPublishResult.ok(
insertSegments(handle, segmentsToInsert)
);
+ upgradePendingSegmentsOverlappingWith(segmentsToInsert);
+ return result;
},
3,
getSqlMetadataMaxRetry()
@@ -519,14 +494,16 @@
@Override
public SegmentPublishResult commitAppendSegments(
final Set<DataSegment> appendSegments,
- final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+ final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ final String taskAllocatorId
)
{
return commitAppendSegmentsAndMetadataInTransaction(
appendSegments,
appendSegmentToReplaceLock,
null,
- null
+ null,
+ taskAllocatorId
);
}
@@ -535,14 +512,16 @@
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
- DataSourceMetadata endMetadata
+ DataSourceMetadata endMetadata,
+ String taskAllocatorId
)
{
return commitAppendSegmentsAndMetadataInTransaction(
appendSegments,
appendSegmentToReplaceLock,
startMetadata,
- endMetadata
+ endMetadata,
+ taskAllocatorId
);
}
@@ -645,7 +624,8 @@
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
- final boolean skipSegmentLineageCheck
+ final boolean skipSegmentLineageCheck,
+ String taskAllocatorId
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@@ -677,7 +657,8 @@
allocateInterval,
partialShardSpec,
maxVersion,
- existingChunks
+ existingChunks,
+ taskAllocatorId
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
@@ -688,7 +669,8 @@
allocateInterval,
partialShardSpec,
maxVersion,
- existingChunks
+ existingChunks,
+ taskAllocatorId
);
}
}
@@ -697,8 +679,7 @@
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
- Set<DataSegment> replaceSegments,
- Set<String> activeRealtimeSequencePrefixes
+ Set<DataSegment> replaceSegments
)
{
if (replaceSegments.isEmpty()) {
@@ -717,7 +698,7 @@
final String datasource = replaceSegments.iterator().next().getDataSource();
return connector.retryWithHandle(
- handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
+ handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId)
);
}
@@ -736,11 +717,10 @@
private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(
Handle handle,
String datasource,
- Map<Interval, DataSegment> replaceIntervalToMaxId,
- Set<String> activeRealtimeSequencePrefixes
- ) throws IOException
+ Map<Interval, DataSegment> replaceIntervalToMaxId
+ ) throws JsonProcessingException
{
- final Map<SegmentCreateRequest, SegmentIdWithShardSpec> newPendingSegmentVersions = new HashMap<>();
+ final List<PendingSegmentRecord> upgradedPendingSegments = new ArrayList<>();
final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> pendingSegmentToNewId = new HashMap<>();
for (Map.Entry<Interval, DataSegment> entry : replaceIntervalToMaxId.entrySet()) {
@@ -751,15 +731,13 @@
final int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions();
int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum();
- final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
- = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes);
+ final List<PendingSegmentRecord> overlappingPendingSegments
+ = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval);
- for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
- : overlappingPendingSegments.entrySet()) {
- final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
- final String pendingSegmentSequence = overlappingPendingSegment.getValue();
+ for (PendingSegmentRecord overlappingPendingSegment : overlappingPendingSegments) {
+ final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getId();
- if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) {
+ if (shouldUpgradePendingSegment(overlappingPendingSegment, replaceInterval, replaceVersion)) {
// Ensure unique sequence_name_prev_id_sha1 by setting
// sequence_prev_id -> pendingSegmentId
// sequence_name -> prefix + replaceVersion
@@ -769,14 +747,14 @@
replaceVersion,
new NumberedShardSpec(++currentPartitionNumber, numCorePartitions)
);
- newPendingSegmentVersions.put(
- new SegmentCreateRequest(
+ upgradedPendingSegments.add(
+ new PendingSegmentRecord(
+ newId,
UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
pendingSegmentId.toString(),
- replaceVersion,
- NumberedPartialShardSpec.instance()
- ),
- newId
+ pendingSegmentId.toString(),
+ overlappingPendingSegment.getTaskAllocatorId()
+ )
);
pendingSegmentToNewId.put(pendingSegmentId, newId);
}
@@ -787,33 +765,34 @@
// includes hash of both sequence_name and prev_segment_id
int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore(
handle,
- newPendingSegmentVersions,
+ upgradedPendingSegments,
datasource,
false
);
log.info(
"Inserted total [%d] new versions for [%d] pending segments.",
- numInsertedPendingSegments, newPendingSegmentVersions.size()
+ numInsertedPendingSegments, upgradedPendingSegments.size()
);
return pendingSegmentToNewId;
}
private boolean shouldUpgradePendingSegment(
- SegmentIdWithShardSpec pendingSegmentId,
- String pendingSegmentSequenceName,
+ PendingSegmentRecord pendingSegment,
Interval replaceInterval,
String replaceVersion
)
{
- if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
+ if (pendingSegment.getTaskAllocatorId() == null) {
return false;
- } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) {
+ } else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
+ return false;
+ } else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
return false;
} else {
// Do not upgrade already upgraded pending segment
- return pendingSegmentSequenceName == null
- || !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
+ return pendingSegment.getSequenceName() == null
+ || !pendingSegment.getSequenceName().startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
}
}
@@ -826,7 +805,8 @@
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
+ final String taskAllocatorId
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
@@ -896,7 +876,8 @@
interval,
previousSegmentIdNotNull,
sequenceName,
- sequenceNamePrevIdSha1
+ sequenceNamePrevIdSha1,
+ taskAllocatorId
);
return newIdentifier;
}
@@ -947,7 +928,7 @@
}
// For each of the remaining requests, create a new segment
- final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = createNewSegments(
+ final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = createNewSegments(
handle,
dataSource,
interval,
@@ -965,12 +946,14 @@
// have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
insertPendingSegmentsIntoMetastore(
handle,
- createdSegments,
+ ImmutableList.copyOf(createdSegments.values()),
dataSource,
skipSegmentLineageCheck
);
- allocatedSegmentIds.putAll(createdSegments);
+ for (Map.Entry<SegmentCreateRequest, PendingSegmentRecord> entry : createdSegments.entrySet()) {
+ allocatedSegmentIds.put(entry.getKey(), entry.getValue().getId());
+ }
return allocatedSegmentIds;
}
@@ -1009,7 +992,8 @@
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
+ final String taskAllocatorId
) throws IOException
{
final String sql = StringUtils.format(
@@ -1073,7 +1057,9 @@
);
// always insert empty previous sequence id
- insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
+ insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1,
+ taskAllocatorId
+ );
log.info(
"Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].",
@@ -1281,7 +1267,8 @@
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@Nullable DataSourceMetadata startMetadata,
- @Nullable DataSourceMetadata endMetadata
+ @Nullable DataSourceMetadata endMetadata,
+ String taskAllocatorId
)
{
verifySegmentsToCommit(appendSegments);
@@ -1291,16 +1278,38 @@
}
final String dataSource = appendSegments.iterator().next().getDataSource();
- final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction(
+ final List<PendingSegmentRecord> segmentIdsForNewVersions = connector.retryTransaction(
(handle, transactionStatus)
- -> createNewIdsForAppendSegments(handle, dataSource, appendSegments),
+ -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, dataSource, taskAllocatorId),
0,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
+
// Create entries for all required versions of the append segments
final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
- allSegmentsToInsert.addAll(segmentIdsForNewVersions);
+
+ final Map<String, DataSegment> segmentIdMap = new HashMap<>();
+ appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment));
+ segmentIdsForNewVersions.forEach(
+ pendingSegment -> {
+ if (segmentIdMap.containsKey(pendingSegment.getUpgradedFromSegmentId())) {
+ final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
+ allSegmentsToInsert.add(
+ new DataSegment(
+ pendingSegment.getId().asSegmentId(),
+ oldSegment.getLoadSpec(),
+ oldSegment.getDimensions(),
+ oldSegment.getMetrics(),
+ pendingSegment.getId().getShardSpec(),
+ oldSegment.getLastCompactionState(),
+ oldSegment.getBinaryVersion(),
+ oldSegment.getSize()
+ )
+ );
+ }
+ }
+ );
final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
try {
@@ -1341,31 +1350,27 @@
}
}
- private int insertPendingSegmentsIntoMetastore(
+ @VisibleForTesting
+ int insertPendingSegmentsIntoMetastore(
Handle handle,
- Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
+ List<PendingSegmentRecord> pendingSegments,
String dataSource,
boolean skipSegmentLineageCheck
) throws JsonProcessingException
{
final PreparedBatch insertBatch = handle.prepareBatch(
StringUtils.format(
- "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
- + "sequence_name_prev_id_sha1, payload) "
- + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
- + ":sequence_name_prev_id_sha1, :payload)",
- dbTables.getPendingSegmentsTable(),
- connector.getQuoteString()
- ));
-
- // Deduplicate the segment ids by inverting the map
- Map<SegmentIdWithShardSpec, SegmentCreateRequest> segmentIdToRequest = new HashMap<>();
- createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
+ "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ + "sequence_name_prev_id_sha1, payload, task_allocator_id, upgraded_from_segment_id) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id, :upgraded_from_segment_id)",
+ dbTables.getPendingSegmentsTable(),
+ connector.getQuoteString()
+ ));
final String now = DateTimes.nowUtc().toString();
- for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : segmentIdToRequest.entrySet()) {
- final SegmentCreateRequest request = entry.getValue();
- final SegmentIdWithShardSpec segmentId = entry.getKey();
+ for (PendingSegmentRecord pendingSegment : pendingSegments) {
+ final SegmentIdWithShardSpec segmentId = pendingSegment.getId();
final Interval interval = segmentId.getInterval();
insertBatch.add()
@@ -1374,13 +1379,15 @@
.bind("created_date", now)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
- .bind("sequence_name", request.getSequenceName())
- .bind("sequence_prev_id", request.getPreviousSegmentId())
+ .bind("sequence_name", pendingSegment.getSequenceName())
+ .bind("sequence_prev_id", pendingSegment.getSequencePrevId())
.bind(
"sequence_name_prev_id_sha1",
- getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck)
+ pendingSegment.computeSequenceNamePrevIdSha1(skipSegmentLineageCheck)
)
- .bind("payload", jsonMapper.writeValueAsBytes(segmentId));
+ .bind("payload", jsonMapper.writeValueAsBytes(segmentId))
+ .bind("task_allocator_id", pendingSegment.getTaskAllocatorId())
+ .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId());
}
int[] updated = insertBatch.execute();
return Arrays.stream(updated).sum();
@@ -1393,15 +1400,16 @@
Interval interval,
String previousSegmentId,
String sequenceName,
- String sequenceNamePrevIdSha1
+ String sequenceNamePrevIdSha1,
+ String taskAllocatorId
) throws JsonProcessingException
{
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
- + "sequence_name_prev_id_sha1, payload) "
+ + "sequence_name_prev_id_sha1, payload, task_allocator_id) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
- + ":sequence_name_prev_id_sha1, :payload)",
+ + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
@@ -1415,188 +1423,18 @@
.bind("sequence_prev_id", previousSegmentId)
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
+ .bind("task_allocator_id", taskAllocatorId)
.execute();
}
- /**
- * Creates new IDs for the given append segments if a REPLACE task started and
- * finished after these append segments had already been allocated. The newly
- * created IDs belong to the same interval and version as the segments committed
- * by the REPLACE task.
- */
- private Set<DataSegment> createNewIdsForAppendSegments(
- Handle handle,
- String dataSource,
- Set<DataSegment> segmentsToAppend
- ) throws IOException
- {
- if (segmentsToAppend.isEmpty()) {
- return Collections.emptySet();
- }
-
- final Set<Interval> appendIntervals = new HashSet<>();
- final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<>();
- for (DataSegment segment : segmentsToAppend) {
- appendIntervals.add(segment.getInterval());
- appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
- .add(segment);
- }
-
- // Fetch all used segments that overlap with any of the append intervals
- final Collection<DataSegment> overlappingSegments = retrieveUsedSegmentsForIntervals(
- dataSource,
- new ArrayList<>(appendIntervals),
- Segments.INCLUDING_OVERSHADOWED
- );
-
- final Map<String, Set<Interval>> overlappingVersionToIntervals = new HashMap<>();
- final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new HashMap<>();
- for (DataSegment segment : overlappingSegments) {
- overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
- .add(segment.getInterval());
- overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>())
- .add(segment);
- }
-
- final Set<DataSegment> upgradedSegments = new HashSet<>();
- for (Map.Entry<String, Set<Interval>> entry : overlappingVersionToIntervals.entrySet()) {
- final String upgradeVersion = entry.getKey();
- Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
- upgradeVersion,
- entry.getValue(),
- appendVersionToSegments
- );
- for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
- final Interval upgradeInterval = upgradeEntry.getKey();
- final Set<DataSegment> segmentsAlreadyOnVersion
- = overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet())
- .stream()
- .filter(s -> s.getVersion().equals(upgradeVersion))
- .collect(Collectors.toSet());
- Set<DataSegment> segmentsUpgradedToVersion = createNewIdsForAppendSegmentsWithVersion(
- handle,
- upgradeVersion,
- upgradeInterval,
- upgradeEntry.getValue(),
- segmentsAlreadyOnVersion
- );
- log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion);
- upgradedSegments.addAll(segmentsUpgradedToVersion);
- }
- }
-
- return upgradedSegments;
- }
-
- /**
- * Creates a Map from eligible interval to Set of segments that are fully
- * contained in that interval and have a version strictly lower than {@code #cutoffVersion}.
- */
- private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
- String cutoffVersion,
- Set<Interval> eligibleIntervals,
- TreeMap<String, Set<DataSegment>> versionToSegments
- )
- {
- final Set<DataSegment> eligibleSegments
- = versionToSegments.headMap(cutoffVersion).values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
-
- final Map<Interval, Set<DataSegment>> eligibleIntervalToSegments = new HashMap<>();
-
- for (DataSegment segment : eligibleSegments) {
- final Interval segmentInterval = segment.getInterval();
- for (Interval eligibleInterval : eligibleIntervals) {
- if (eligibleInterval.contains(segmentInterval)) {
- eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet<>())
- .add(segment);
- break;
- } else if (eligibleInterval.overlaps(segmentInterval)) {
- // Committed interval overlaps only partially
- throw new ISE(
- "Committed interval[%s] conflicts with interval[%s] of append segment[%s].",
- eligibleInterval, segmentInterval, segment.getId()
- );
- }
- }
- }
-
- return eligibleIntervalToSegments;
- }
-
- /**
- * Computes new segment IDs that belong to the upgradeInterval and upgradeVersion.
- *
- * @param committedSegments Segments that already exist in the upgradeInterval
- * at upgradeVersion.
- */
- private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
- Handle handle,
- String upgradeVersion,
- Interval upgradeInterval,
- Set<DataSegment> segmentsToUpgrade,
- Set<DataSegment> committedSegments
- ) throws IOException
- {
- // Find the committed segments with the higest partition number
- SegmentIdWithShardSpec committedMaxId = null;
- for (DataSegment committedSegment : committedSegments) {
- if (committedMaxId == null
- || committedMaxId.getShardSpec().getPartitionNum() < committedSegment.getShardSpec().getPartitionNum()) {
- committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment);
- }
- }
-
- // Get pending segments for the new version to determine the next partition number to allocate
- final String dataSource = segmentsToUpgrade.iterator().next().getDataSource();
- final Set<SegmentIdWithShardSpec> pendingSegmentIds
- = getPendingSegmentsForIntervalWithHandle(handle, dataSource, upgradeInterval).keySet();
- final Set<SegmentIdWithShardSpec> allAllocatedIds = new HashSet<>(pendingSegmentIds);
-
- // Create new IDs for each append segment
- final Set<DataSegment> newSegmentIds = new HashSet<>();
- for (DataSegment segment : segmentsToUpgrade) {
- SegmentCreateRequest request = new SegmentCreateRequest(
- segment.getId() + "__" + upgradeVersion,
- null,
- upgradeVersion,
- NumberedPartialShardSpec.instance()
- );
-
- // Create new segment ID based on committed segments, allocated pending segments
- // and new IDs created so far in this method
- final SegmentIdWithShardSpec newId = createNewSegment(
- request,
- dataSource,
- upgradeInterval,
- upgradeVersion,
- committedMaxId,
- allAllocatedIds
- );
-
- // Update the set so that subsequent segment IDs use a higher partition number
- allAllocatedIds.add(newId);
- newSegmentIds.add(
- DataSegment.builder(segment)
- .interval(newId.getInterval())
- .version(newId.getVersion())
- .shardSpec(newId.getShardSpec())
- .build()
- );
- }
-
- return newSegmentIds;
- }
-
- private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+ private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
Handle handle,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<TimelineObjectHolder<String, DataSegment>> existingChunks,
List<SegmentCreateRequest> requests
- ) throws IOException
+ )
{
if (requests.isEmpty()) {
return Collections.emptyMap();
@@ -1637,18 +1475,21 @@
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
- final Set<SegmentIdWithShardSpec> pendingSegments =
- new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
+ final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(
+ getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
+ .map(PendingSegmentRecord::getId)
+ .collect(Collectors.toSet())
+ );
- final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>();
- final Map<UniqueAllocateRequest, SegmentIdWithShardSpec> uniqueRequestToSegment = new HashMap<>();
+ final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = new HashMap<>();
+ final Map<UniqueAllocateRequest, PendingSegmentRecord> uniqueRequestToSegment = new HashMap<>();
for (SegmentCreateRequest request : requests) {
// Check if the required segment has already been created in this batch
final UniqueAllocateRequest uniqueRequest =
new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck);
- final SegmentIdWithShardSpec createdSegment;
+ final PendingSegmentRecord createdSegment;
if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
createdSegment = uniqueRequestToSegment.get(uniqueRequest);
} else {
@@ -1663,9 +1504,9 @@
// Add to pendingSegments to consider for partitionId
if (createdSegment != null) {
- pendingSegments.add(createdSegment);
+ pendingSegments.add(createdSegment.getId());
uniqueRequestToSegment.put(uniqueRequest, createdSegment);
- log.info("Created new segment[%s]", createdSegment);
+ log.info("Created new segment[%s]", createdSegment.getId());
}
}
@@ -1678,7 +1519,7 @@
return createdSegments;
}
- private SegmentIdWithShardSpec createNewSegment(
+ private PendingSegmentRecord createNewSegment(
SegmentCreateRequest request,
String dataSource,
Interval interval,
@@ -1731,12 +1572,19 @@
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
- return new SegmentIdWithShardSpec(
+ SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
+ return new PendingSegmentRecord(
+ pendingSegmentId,
+ request.getSequenceName(),
+ request.getPreviousSegmentId(),
+ request.getUpgradedFromSegmentId(),
+ request.getTaskAllocatorId()
+ );
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
@@ -1761,7 +1609,7 @@
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
- return new SegmentIdWithShardSpec(
+ SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@@ -1771,6 +1619,13 @@
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
+ return new PendingSegmentRecord(
+ pendingSegmentId,
+ request.getSequenceName(),
+ request.getPreviousSegmentId(),
+ request.getUpgradedFromSegmentId(),
+ request.getTaskAllocatorId()
+ );
}
}
@@ -1796,7 +1651,7 @@
final PartialShardSpec partialShardSpec,
final String existingVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
- ) throws IOException
+ )
{
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
@@ -1830,7 +1685,9 @@
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
- getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()
+ getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
+ .map(PendingSegmentRecord::getId)
+ .collect(Collectors.toSet())
);
if (committedMaxId != null) {
pendings.add(committedMaxId);
@@ -2689,6 +2546,30 @@
}
@Override
+ public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup)
+ {
+ return connector.getDBI().inTransaction(
+ (handle, status) -> handle
+ .createStatement(
+ StringUtils.format(
+ "DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
+ dbTables.getPendingSegmentsTable()
+ )
+ )
+ .bind("task_allocator_id", pendingSegmentsGroup)
+ .execute()
+ );
+ }
+
+ @Override
+ public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
+ {
+ return connector.retryWithHandle(
+ handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, interval)
+ );
+ }
+
+ @Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{
return connector.getDBI().inTransaction(
diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
new file mode 100644
index 0000000..44c62bf
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+/**
+ * Representation of a record in the pending segments table. <br/>
+ * Mapping of column in table to field:
+ *
+ * <ul>
+ * <li> id -> id (Unique identifier for pending segment) <li/>
+ * <li> sequence_name -> sequenceName (sequence name used for segment allocation) <li/>
+ * <li> sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation) <li/>
+ * <li> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded) <li/>
+ * <li> task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment) <li/>
+ * </ul>
+ */
+public class PendingSegmentRecord
+{
+ private final SegmentIdWithShardSpec id;
+ private final String sequenceName;
+ private final String sequencePrevId;
+ private final String upgradedFromSegmentId;
+ private final String taskAllocatorId;
+
+ public PendingSegmentRecord(
+ SegmentIdWithShardSpec id,
+ String sequenceName,
+ String sequencePrevId,
+ @Nullable String upgradedFromSegmentId,
+ @Nullable String taskAllocatorId
+ )
+ {
+ this.id = id;
+ this.sequenceName = sequenceName;
+ this.sequencePrevId = sequencePrevId;
+ this.upgradedFromSegmentId = upgradedFromSegmentId;
+ this.taskAllocatorId = taskAllocatorId;
+ }
+
+ public SegmentIdWithShardSpec getId()
+ {
+ return id;
+ }
+
+ public String getSequenceName()
+ {
+ return sequenceName;
+ }
+
+ public String getSequencePrevId()
+ {
+ return sequencePrevId;
+ }
+
+ /**
+ * The original pending segment using which this upgraded segment was created.
+ * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded.
+ */
+ @Nullable
+ public String getUpgradedFromSegmentId()
+ {
+ return upgradedFromSegmentId;
+ }
+
+ /**
+ * task / taskGroup / replica group of task that allocated this segment.
+ * Can be null for pending segments allocated before this column was added.
+ */
+ @Nullable
+ public String getTaskAllocatorId()
+ {
+ return taskAllocatorId;
+ }
+
+ @SuppressWarnings("UnstableApiUsage")
+ public String computeSequenceNamePrevIdSha1(boolean skipSegmentLineageCheck)
+ {
+ final Hasher hasher = Hashing.sha1().newHasher()
+ .putBytes(StringUtils.toUtf8(getSequenceName()))
+ .putByte((byte) 0xff);
+
+ if (skipSegmentLineageCheck) {
+ final Interval interval = getId().getInterval();
+ hasher
+ .putLong(interval.getStartMillis())
+ .putLong(interval.getEndMillis());
+ } else {
+ hasher
+ .putBytes(StringUtils.toUtf8(getSequencePrevId()));
+ }
+
+ hasher.putByte((byte) 0xff);
+ hasher.putBytes(StringUtils.toUtf8(getId().getVersion()));
+
+ return BaseEncoding.base16().encode(hasher.hash().asBytes());
+ }
+
+ public static PendingSegmentRecord fromResultSet(ResultSet resultSet, ObjectMapper jsonMapper)
+ {
+ try {
+ final byte[] payload = resultSet.getBytes("payload");
+ return new PendingSegmentRecord(
+ jsonMapper.readValue(payload, SegmentIdWithShardSpec.class),
+ resultSet.getString("sequence_name"),
+ resultSet.getString("sequence_prev_id"),
+ resultSet.getString("upgraded_from_segment_id"),
+ resultSet.getString("task_allocator_id")
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 6feaf9e..99d69f5 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -288,6 +288,7 @@
)
)
);
+ alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
}
public void createDataSourceTable(final String tableName)
@@ -460,6 +461,26 @@
}
}
+ private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
+ {
+ List<String> statements = new ArrayList<>();
+ if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
+ log.info("Table[%s] already has column[upgraded_from_segment_id].", tableName);
+ } else {
+ log.info("Adding column[upgraded_from_segment_id] to table[%s].", tableName);
+ statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN upgraded_from_segment_id VARCHAR(255)", tableName));
+ }
+ if (tableHasColumn(tableName, "task_allocator_id")) {
+ log.info("Table[%s] already has column[task_allocator_id].", tableName);
+ } else {
+ log.info("Adding column[task_allocator_id] to table[%s].", tableName);
+ statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_allocator_id VARCHAR(255)", tableName));
+ }
+ if (!statements.isEmpty()) {
+ alterTable(tableName, statements);
+ }
+ }
+
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(
diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
index 33641a8..57e01d7 100644
--- a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
@@ -35,7 +35,9 @@
"sequence",
null,
"version",
- partialShardSpec
+ partialShardSpec,
+ null,
+ null
);
Assert.assertEquals("sequence", request.getSequenceName());
Assert.assertEquals("", request.getPreviousSegmentId());
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 7b6fb4d..95ec1dd 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -25,8 +25,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.hash.Hashing;
-import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -471,44 +469,6 @@
);
}
- private Boolean insertPendingSegmentAndSequenceName(Pair<SegmentIdWithShardSpec, String> pendingSegmentSequenceName)
- {
- final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs;
- final String sequenceName = pendingSegmentSequenceName.rhs;
- final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
- return derbyConnector.retryWithHandle(
- handle -> {
- handle.createStatement(
- StringUtils.format(
- "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
- + "sequence_name_prev_id_sha1, payload) "
- + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
- + ":sequence_name_prev_id_sha1, :payload)",
- table,
- derbyConnector.getQuoteString()
- )
- )
- .bind("id", pendingSegment.toString())
- .bind("dataSource", pendingSegment.getDataSource())
- .bind("created_date", DateTimes.nowUtc().toString())
- .bind("start", pendingSegment.getInterval().getStart().toString())
- .bind("end", pendingSegment.getInterval().getEnd().toString())
- .bind("sequence_name", sequenceName)
- .bind("sequence_prev_id", pendingSegment.toString())
- .bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode(
- Hashing.sha1()
- .newHasher()
- .putLong((long) pendingSegment.hashCode() * sequenceName.hashCode())
- .hash()
- .asBytes()
- ))
- .bind("payload", mapper.writeValueAsBytes(pendingSegment))
- .execute();
- return true;
- }
- );
- }
-
private Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskId)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
@@ -620,7 +580,7 @@
// Commit the segment and verify the results
SegmentPublishResult commitResult
- = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock);
+ = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append");
Assert.assertTrue(commitResult.isSuccess());
Assert.assertEquals(appendSegments, commitResult.getSegments());
@@ -649,6 +609,30 @@
final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
+ final PendingSegmentRecord pendingSegmentInInterval = new PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ "foo",
+ Intervals.of("2023-01-01/2023-01-02"),
+ "2023-01-02",
+ new NumberedShardSpec(100, 0)
+ ),
+ "",
+ "",
+ null,
+ "append"
+ );
+ final PendingSegmentRecord pendingSegmentOutsideInterval = new PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ "foo",
+ Intervals.of("2023-04-01/2023-04-02"),
+ "2023-01-02",
+ new NumberedShardSpec(100, 0)
+ ),
+ "",
+ "",
+ null,
+ "append"
+ );
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
@@ -665,6 +649,14 @@
appendedSegmentToReplaceLockMap.put(segment, replaceLock);
}
insertUsedSegments(segmentsAppendedWithReplaceLock);
+ derbyConnector.retryWithHandle(
+ handle -> coordinator.insertPendingSegmentsIntoMetastore(
+ handle,
+ ImmutableList.of(pendingSegmentInInterval, pendingSegmentOutsideInterval),
+ "foo",
+ true
+ )
+ );
insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap);
final Set<DataSegment> replacingSegments = new HashSet<>();
@@ -709,6 +701,25 @@
}
Assert.assertTrue(hasBeenCarriedForward);
}
+
+ List<PendingSegmentRecord> pendingSegmentsInInterval =
+ coordinator.getPendingSegments("foo", Intervals.of("2023-01-01/2023-02-01"));
+ Assert.assertEquals(2, pendingSegmentsInInterval.size());
+ final SegmentId rootPendingSegmentId = pendingSegmentInInterval.getId().asSegmentId();
+ if (pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId() == null) {
+ Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(0).getId().asSegmentId());
+ Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(1).getUpgradedFromSegmentId());
+ } else {
+ Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(1).getId().asSegmentId());
+ Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId());
+ }
+
+ List<PendingSegmentRecord> pendingSegmentsOutsideInterval =
+ coordinator.getPendingSegments("foo", Intervals.of("2023-04-01/2023-05-01"));
+ Assert.assertEquals(1, pendingSegmentsOutsideInterval.size());
+ Assert.assertEquals(
+ pendingSegmentOutsideInterval.getId().asSegmentId(), pendingSegmentsOutsideInterval.get(0).getId().asSegmentId()
+ );
}
@Test
@@ -2416,7 +2427,8 @@
interval,
partialShardSpec,
"version",
- false
+ false,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
@@ -2428,7 +2440,8 @@
interval,
partialShardSpec,
identifier.getVersion(),
- false
+ false,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
@@ -2440,7 +2453,8 @@
interval,
partialShardSpec,
identifier1.getVersion(),
- false
+ false,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
@@ -2452,7 +2466,8 @@
interval,
partialShardSpec,
identifier1.getVersion(),
- false
+ false,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString());
@@ -2465,7 +2480,8 @@
interval,
partialShardSpec,
"version",
- false
+ false,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString());
@@ -2501,7 +2517,8 @@
interval,
partialShardSpec,
"version",
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
// Since there are no used core partitions yet
@@ -2515,7 +2532,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
// Since there are no used core partitions yet
@@ -2529,7 +2547,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
// Since there are no used core partitions yet
@@ -2559,7 +2578,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
// Used segment set has 1 core partition
@@ -2577,7 +2597,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
// Since all core partitions have been dropped
@@ -2610,7 +2631,8 @@
interval,
partialShardSpec,
"A",
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString());
// Assume it publishes; create its corresponding segment
@@ -2638,7 +2660,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString());
// Assume it publishes; create its corresponding segment
@@ -2666,7 +2689,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString());
// Assume it publishes; create its corresponding segment
@@ -2720,7 +2744,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString());
// no corresponding segment, pending aborted
@@ -2754,7 +2779,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
// maxid = B_1 -> new partno = 2
// versionofexistingchunk=A
@@ -2791,7 +2817,7 @@
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
- final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec);
+ final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2802,7 +2828,7 @@
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
final SegmentCreateRequest request1 =
- new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec);
+ new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2813,7 +2839,7 @@
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
final SegmentCreateRequest request2 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2824,7 +2850,7 @@
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
final SegmentCreateRequest request3 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2836,7 +2862,7 @@
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
- new SegmentCreateRequest("seq1", null, "v1", partialShardSpec);
+ new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2880,7 +2906,8 @@
interval,
partialShardSpec,
maxVersion,
- true
+ true,
+ null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString());
@@ -2905,7 +2932,8 @@
interval,
partialShardSpec,
"version",
- false
+ false,
+ null
);
prevSegmentId = identifier.toString();
}
@@ -2920,7 +2948,8 @@
interval,
partialShardSpec,
"version",
- false
+ false,
+ null
);
prevSegmentId = identifier.toString();
}
@@ -2947,7 +2976,8 @@
interval,
new NumberedOverwritePartialShardSpec(0, 1, (short) (i + 1)),
"version",
- false
+ false,
+ null
);
Assert.assertEquals(
StringUtils.format(
@@ -3015,7 +3045,8 @@
interval,
partialShardSpec,
"version",
- true
+ true,
+ null
);
HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3046,7 +3077,8 @@
interval,
partialShardSpec,
"version",
- true
+ true,
+ null
);
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3077,7 +3109,8 @@
interval,
new HashBasedNumberedPartialShardSpec(null, 2, 3, null),
"version",
- true
+ true,
+ null
);
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3124,7 +3157,8 @@
interval,
NumberedPartialShardSpec.instance(),
version,
- false
+ false,
+ null
);
Assert.assertNull(id);
}
@@ -3169,7 +3203,8 @@
interval,
NumberedPartialShardSpec.instance(),
version,
- false
+ false,
+ null
);
Assert.assertNull(id);
}
@@ -3330,64 +3365,6 @@
}
@Test
- public void testGetPendingSegmentsForIntervalWithSequencePrefixes()
- {
- Pair<SegmentIdWithShardSpec, String> validIntervalValidSequence = Pair.of(
- SegmentIdWithShardSpec.fromDataSegment(defaultSegment),
- "validLOL"
- );
- insertPendingSegmentAndSequenceName(validIntervalValidSequence);
-
- Pair<SegmentIdWithShardSpec, String> validIntervalInvalidSequence = Pair.of(
- SegmentIdWithShardSpec.fromDataSegment(defaultSegment2),
- "invalidRandom"
- );
- insertPendingSegmentAndSequenceName(validIntervalInvalidSequence);
-
- Pair<SegmentIdWithShardSpec, String> invalidIntervalvalidSequence = Pair.of(
- SegmentIdWithShardSpec.fromDataSegment(existingSegment1),
- "validStuff"
- );
- insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence);
-
- Pair<SegmentIdWithShardSpec, String> twentyFifteenWithAnotherValidSequence = Pair.of(
- new SegmentIdWithShardSpec(
- existingSegment1.getDataSource(),
- Intervals.of("2015/2016"),
- "1970-01-01",
- new NumberedShardSpec(1, 0)
- ),
- "alsoValidAgain"
- );
- insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence);
-
- Pair<SegmentIdWithShardSpec, String> twentyFifteenWithInvalidSequence = Pair.of(
- new SegmentIdWithShardSpec(
- existingSegment1.getDataSource(),
- Intervals.of("2015/2016"),
- "1970-01-01",
- new NumberedShardSpec(2, 0)
- ),
- "definitelyInvalid"
- );
- insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence);
-
-
- final Map<SegmentIdWithShardSpec, String> expected = new HashMap<>();
- expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs);
- expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs);
-
- final Map<SegmentIdWithShardSpec, String> actual =
- derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle(
- handle,
- defaultSegment.getDataSource(),
- defaultSegment.getInterval(),
- ImmutableSet.of("valid", "alsoValid")
- ));
- Assert.assertEquals(expected, actual);
- }
-
- @Test
public void testRetrieveUsedSegmentsAndCreatedDates()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
@@ -3471,7 +3448,8 @@
interval,
NumberedPartialShardSpec.instance(),
"version",
- false
+ false,
+ null
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
@@ -3525,7 +3503,8 @@
interval,
NumberedPartialShardSpec.instance(),
"version",
- false
+ false,
+ null
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());