Associate pending segments with the tasks that requested them (#16144)

Changes:
- Add column `task_allocator_id` to `pendingSegments` metadata table.
- Add column `upgraded_from_segment_id` to `pendingSegments` metadata table.
- Add interface `PendingSegmentAllocatingTask` and implement it by all tasks which
can allocate pending segments.
- Use `taskAllocatorId` to identify the task (and its sub-tasks or replicas) to which
a pending segment has been allocated.
- Perform active cleanup of pending segments in `TaskLockbox` once there are no
active tasks for the corresponding task allocator id.
- When committing APPEND segments, also commit all upgraded pending segments
corresponding to that task allocator id.
- When committing REPLACE segments, upgrade all overlapping pending segments in
the same transaction.

diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 64cecdf..7bb57c5 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -40,6 +40,7 @@
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -71,7 +72,7 @@
 import java.util.Set;
 
 @JsonTypeName(MSQControllerTask.TYPE)
-public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
+public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, PendingSegmentAllocatingTask
 {
   public static final String TYPE = "query_controller";
   public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select";
@@ -157,6 +158,12 @@
     return ImmutableSet.of();
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getId();
+  }
+
   @JsonProperty("spec")
   public MSQSpec getQuerySpec()
   {
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index 95abc6b..c049484 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -32,6 +32,7 @@
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.Worker;
@@ -45,7 +46,7 @@
 import java.util.Set;
 
 @JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask
+public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
 {
   public static final String TYPE = "query_worker";
 
@@ -125,6 +126,12 @@
     return ImmutableSet.of();
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getControllerTaskId();
+  }
+
 
   @Override
   public boolean isReady(final TaskActionClient taskActionClient)
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 6aaf21e..309eb38 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -35,7 +35,7 @@
 
 public class MSQControllerTaskTest
 {
-  MSQSpec MSQ_SPEC = MSQSpec
+  private final MSQSpec MSQ_SPEC = MSQSpec
       .builder()
       .destination(new DataSourceMSQDestination(
           "target",
@@ -59,7 +59,7 @@
   @Test
   public void testGetInputSourceResources()
   {
-    MSQControllerTask msqWorkerTask = new MSQControllerTask(
+    MSQControllerTask controllerTask = new MSQControllerTask(
         null,
         MSQ_SPEC,
         null,
@@ -67,7 +67,25 @@
         null,
         null,
         null,
-        null);
-    Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
+        null
+    );
+    Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
+  }
+
+  @Test
+  public void testGetTaskAllocatorId()
+  {
+    final String taskId = "taskId";
+    MSQControllerTask controllerTask = new MSQControllerTask(
+        taskId,
+        MSQ_SPEC,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
   }
 }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 37c31ba..482d67d 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -47,7 +47,6 @@
   @Test
   public void testEquals()
   {
-    Assert.assertNotEquals(msqWorkerTask, 0);
     Assert.assertEquals(msqWorkerTask, msqWorkerTask);
     Assert.assertEquals(
         msqWorkerTask,
@@ -110,4 +109,11 @@
     Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
   }
 
+  @Test
+  public void testGetTaskAllocatorId()
+  {
+    MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
+    Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
+  }
+
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index 280f419..d030851 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -23,8 +23,10 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
@@ -210,6 +212,12 @@
       final TaskActionToolbox toolbox
   )
   {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      throw DruidException.defensive(
+          "Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.",
+          task.getId(), task.getType()
+      );
+    }
     int attempt = 0;
     while (true) {
       attempt++;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 67b7017..1a1e6c7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -22,10 +22,12 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -41,8 +43,20 @@
 import java.util.stream.Collectors;
 
 /**
+ *
  * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
  * your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval):
+ * For an append lock held over an interval:
+ *     transaction {
+ *       commit input segments contained within interval
+ *       if there is an active replace lock over the interval:
+ *         add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
+ *       fetch pending segments with parent contained within the input segments, and commit them
+ *     }
+ * </pre>
  */
 public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
 {
@@ -114,6 +128,13 @@
   @Override
   public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
   {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      throw DruidException.defensive(
+          "Task[%s] of type[%s] cannot append segments as it does not implement PendingSegmentAllocatingTask.",
+          task.getId(),
+          task.getType()
+      );
+    }
     // Verify that all the locks are of expected type
     final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
     for (TaskLock lock : locks) {
@@ -132,17 +153,20 @@
         = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
 
     final CriticalAction.Action<SegmentPublishResult> publishAction;
+    final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
     if (startMetadata == null) {
       publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
           segments,
-          segmentToReplaceLock
+          segmentToReplaceLock,
+          taskAllocatorId
       );
     } else {
       publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
           segments,
           segmentToReplaceLock,
           startMetadata,
-          endMetadata
+          endMetadata,
+          taskAllocatorId
       );
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index aaa62db..2f4a580 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -30,11 +30,14 @@
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -42,6 +45,20 @@
 /**
  * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
  * your task for the segment intervals.
+ *
+ * <pre>
+ *  Pseudo code (for a single interval)
+ *- For a replace lock held over an interval:
+ *     transaction {
+ *       commit input segments contained within interval
+ *       upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
+ *       fetch payload, task_allocator_id for pending segments
+ *       upgrade each such pending segment to the replace lock's version with the corresponding root segment
+ *     }
+ * For every pending segment with version == replace lock version:
+ *    Fetch payload, group_id or the pending segment and relay them to the supervisor
+ *    The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries
+ * </pre>
  */
 public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
 {
@@ -123,7 +140,7 @@
     // failure to upgrade pending segments does not affect success of the commit
     if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
       try {
-        tryUpgradeOverlappingPendingSegments(task, toolbox);
+        registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
       }
       catch (Exception e) {
         log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
@@ -134,34 +151,55 @@
   }
 
   /**
-   * Tries to upgrade any pending segments that overlap with the committed segments.
+   * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
 
-    upgradedPendingSegments.forEach(
-        (oldId, newId) -> toolbox.getSupervisorManager()
-                                 .registerNewVersionOfPendingSegmentOnSupervisor(
-                                     activeSupervisorIdWithAppendLock.get(),
-                                     oldId,
-                                     newId
-                                 )
+
+    Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+    for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+      pendingSegments.addAll(
+          toolbox.getIndexerMetadataStorageCoordinator()
+                 .getPendingSegments(task.getDataSource(), replaceLock.getInterval())
+      );
+    }
+    Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> {
+      if (pendingSegment.getUpgradedFromSegmentId() != null
+          && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
+        segmentToParent.put(
+            pendingSegment.getId(),
+            idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
+        );
+      }
+    });
+
+    segmentToParent.forEach(
+        (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+            activeSupervisorIdWithAppendLock.get(),
+            oldId,
+            newId
+        )
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1c581cd..4275926 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -119,7 +119,8 @@
 import java.util.concurrent.TimeoutException;
 
 @Deprecated
-public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler
+public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
+    implements ChatHandler, PendingSegmentAllocatingTask
 {
   private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
 
@@ -260,6 +261,12 @@
   }
 
   @Override
+  public String getTaskAllocatorId()
+  {
+    return getGroupId();
+  }
+
+  @Override
   public TaskStatus runTask(final TaskToolbox toolbox)
   {
     runThread = Thread.currentThread();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index e5df64c..f5aec08 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -127,7 +127,7 @@
  * serialization fields of this class must correspond to those of {@link
  * ClientCompactionTaskQuery}.
  */
-public class CompactionTask extends AbstractBatchIndexTask
+public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
 {
   private static final Logger log = new Logger(CompactionTask.class);
   private static final Clock UTC_CLOCK = Clock.systemUTC();
@@ -400,6 +400,12 @@
     return TYPE;
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getGroupId();
+  }
+
   @Nonnull
   @JsonIgnore
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 329bc0d..1796f6e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -137,7 +137,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
+public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, PendingSegmentAllocatingTask
 {
 
   public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
@@ -302,6 +302,12 @@
     }
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getGroupId();
+  }
+
   @Nonnull
   @JsonIgnore
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 678d8ca..9d91542 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -39,7 +39,7 @@
 
 /**
  */
-public class NoopTask extends AbstractTask
+public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask
 {
   private static final int DEFAULT_RUN_TIME = 2500;
 
@@ -111,6 +111,12 @@
     return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getId();
+  }
+
   public static NoopTask create()
   {
     return forDatasource(null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java
new file mode 100644
index 0000000..e392adc
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+/**
+ * An interface to be implemented by every appending task that allocates pending segments.
+ */
+public interface PendingSegmentAllocatingTask
+{
+  /**
+   * Unique string used by an appending task (or its sub-tasks and replicas) to allocate pending segments
+   * and identify pending segments allocated to it.
+   */
+  String getTaskAllocatorId();
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 5d72f4a..935adb3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -52,6 +52,7 @@
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
@@ -131,7 +132,8 @@
  *
  * @see ParallelIndexTaskRunner
  */
-public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler
+public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
+    implements ChatHandler, PendingSegmentAllocatingTask
 {
   public static final String TYPE = "index_parallel";
 
@@ -476,6 +478,12 @@
     );
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getGroupId();
+  }
+
   @Nullable
   @Override
   public Granularity getSegmentGranularity()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 0b4f62e..e02d599 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -43,6 +43,7 @@
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -105,7 +106,7 @@
  * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
  * publishing on its own.
  */
-public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler
+public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
 {
   public static final String TYPE = "single_phase_sub_task";
   public static final String OLD_TYPE_NAME = "index_sub";
@@ -237,6 +238,12 @@
   }
 
   @Override
+  public String getTaskAllocatorId()
+  {
+    return getGroupId();
+  }
+
+  @Override
   public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
   {
     try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 54e2919..35ec79d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.LockGranularity;
@@ -38,6 +39,7 @@
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
 import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
@@ -99,10 +101,20 @@
 
   private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
 
-  // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
-  // this set should be accessed under the giant lock.
+  /**
+   * Set of active tasks. Locks can be granted only to a task present in this set.
+   * Should be accessed only under the giant lock.
+   */
   private final Set<String> activeTasks = new HashSet<>();
 
+  /**
+   * Map from a taskAllocatorId to the set of active taskIds using that allocator id.
+   * Used to clean up pending segments for a taskAllocatorId as soon as the set
+   * of corresponding active taskIds becomes empty.
+   */
+  @GuardedBy("giant")
+  private final Map<String, Set<String>> activeAllocatorIdToTaskIds = new HashMap<>();
+
   @Inject
   public TaskLockbox(
       TaskStorage taskStorage,
@@ -213,6 +225,12 @@
           activeTasks.remove(task.getId());
         }
       }
+      activeAllocatorIdToTaskIds.clear();
+      for (Task task : storedActiveTasks) {
+        if (activeTasks.contains(task.getId())) {
+          trackAppendingTask(task);
+        }
+      }
 
       log.info(
           "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
@@ -387,7 +405,7 @@
       if (request instanceof LockRequestForNewSegment) {
         final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request;
         if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) {
-          newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion());
+          newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null);
           if (newSegmentId == null) {
             return LockResult.fail();
           }
@@ -411,7 +429,12 @@
                   newSegmentId
               );
             }
-            newSegmentId = allocateSegmentId(lockRequestForNewSegment, posseToUse.getTaskLock().getVersion());
+            final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+            newSegmentId = allocateSegmentId(
+                lockRequestForNewSegment,
+                posseToUse.getTaskLock().getVersion(),
+                taskAllocatorId
+            );
           }
         }
         return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
@@ -514,6 +537,7 @@
     }
   }
 
+  @Nullable
   private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)
   {
     Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
@@ -710,7 +734,7 @@
     }
   }
 
-  private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version)
+  private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version, String allocatorId)
   {
     return metadataStorageCoordinator.allocatePendingSegment(
         request.getDataSource(),
@@ -719,7 +743,8 @@
         request.getInterval(),
         request.getPartialShardSpec(),
         version,
-        request.isSkipSegmentLineageCheck()
+        request.isSkipSegmentLineageCheck(),
+        allocatorId
     );
   }
 
@@ -1159,12 +1184,25 @@
     try {
       log.info("Adding task[%s] to activeTasks", task.getId());
       activeTasks.add(task.getId());
+      trackAppendingTask(task);
     }
     finally {
       giant.unlock();
     }
   }
 
+  @GuardedBy("giant")
+  private void trackAppendingTask(Task task)
+  {
+    if (task instanceof PendingSegmentAllocatingTask) {
+      final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+      if (taskAllocatorId != null) {
+        activeAllocatorIdToTaskIds.computeIfAbsent(taskAllocatorId, s -> new HashSet<>())
+                                  .add(task.getId());
+      }
+    }
+  }
+
   /**
    * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
    *
@@ -1176,13 +1214,35 @@
     try {
       try {
         log.info("Removing task[%s] from activeTasks", task.getId());
-        if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
-          final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
-          log.info(
-              "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
-              upgradeSegmentsDeleted,
-              task.getId()
-          );
+        try {
+          // Clean upgrade segments table for entries associated with replacing task
+          if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
+            final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+            log.info(
+                "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
+                upgradeSegmentsDeleted, task.getId()
+            );
+          }
+          // Clean pending segments associated with the appending task
+          if (task instanceof PendingSegmentAllocatingTask) {
+            final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
+            if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+              final Set<String> idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId);
+              idsInSameGroup.remove(task.getId());
+              if (idsInSameGroup.isEmpty()) {
+                final int pendingSegmentsDeleted
+                    = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
+                log.info(
+                    "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
+                    pendingSegmentsDeleted, taskAllocatorId
+                );
+              }
+              activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+            }
+          }
+        }
+        catch (Exception e) {
+          log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
         }
         unlockAll(task);
       }
@@ -1771,7 +1831,9 @@
             action.getSequenceName(),
             action.getPreviousSegmentId(),
             acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
-            action.getPartialShardSpec()
+            action.getPartialShardSpec(),
+            null,
+            ((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
         );
       }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 810a991..dd57b56 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -39,7 +39,6 @@
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -126,15 +125,6 @@
     return Optional.absent();
   }
 
-  public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
-  {
-    if (supervisors.containsKey(activeSupervisorId)) {
-      return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
-    } else {
-      return Collections.emptySet();
-    }
-  }
-
   public Optional<SupervisorSpec> getSupervisorSpec(String id)
   {
     Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
@@ -340,7 +330,7 @@
       return true;
     }
     catch (Exception e) {
-      log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed",
+      log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
                 basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId);
     }
     return false;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 5450901..0ec9a67 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -37,6 +37,7 @@
 import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -61,7 +62,7 @@
 
 
 public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
-    extends AbstractTask implements ChatHandler
+    extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask
 {
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
@@ -269,6 +270,12 @@
     return !beforeMinimumMessageTime && !afterMaximumMessageTime;
   }
 
+  @Override
+  public String getTaskAllocatorId()
+  {
+    return getTaskResource().getAvailabilityGroup();
+  }
+
   protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType> createTaskRunner();
 
   /**
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 032142b..44f4ee1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -60,6 +60,7 @@
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
@@ -118,6 +119,7 @@
   private SegmentsMetadataManager segmentsMetadataManager;
   private TaskLockbox lockbox;
   private File baseDir;
+  private SupervisorManager supervisorManager;
   protected File reportsFile;
 
   @Before
@@ -227,13 +229,14 @@
         taskStorage,
         storageCoordinator,
         new NoopServiceEmitter(),
-        null,
+        supervisorManager,
         objectMapper
     );
   }
 
-  public TaskToolbox createTaskToolbox(TaskConfig config, Task task)
+  public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorManager supervisorManager)
   {
+    this.supervisorManager = supervisorManager;
     return new TaskToolbox.Builder()
         .config(config)
         .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index 69a8b6c..230cfa4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -36,10 +36,13 @@
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -49,6 +52,7 @@
 {
   private final TaskActionClient client;
   private final AtomicInteger sequenceId = new AtomicInteger(0);
+  private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = new HashMap<>();
 
   public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory)
   {
@@ -78,16 +82,25 @@
     );
   }
 
+  public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()
+  {
+    return announcedSegmentsToParentSegments;
+  }
+
   public SegmentPublishResult commitAppendSegments(DataSegment... segments)
   {
-    return runAction(
+    SegmentPublishResult publishResult = runAction(
         SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
     );
+    for (DataSegment segment : publishResult.getSegments()) {
+      announcedSegmentsToParentSegments.remove(segment.getId());
+    }
+    return publishResult;
   }
 
   public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Granularity preferredSegmentGranularity)
   {
-    return runAction(
+    SegmentIdWithShardSpec pendingSegment = runAction(
         new SegmentAllocateAction(
             getDataSource(),
             timestamp,
@@ -101,28 +114,8 @@
             TaskLockType.APPEND
         )
     );
-  }
-
-  public SegmentIdWithShardSpec allocateSegmentForTimestamp(
-      DateTime timestamp,
-      Granularity preferredSegmentGranularity,
-      String sequenceName
-  )
-  {
-    return runAction(
-        new SegmentAllocateAction(
-            getDataSource(),
-            timestamp,
-            Granularities.SECOND,
-            preferredSegmentGranularity,
-            getId() + "__" + sequenceName,
-            null,
-            false,
-            NumberedPartialShardSpec.instance(),
-            LockGranularity.TIME_CHUNK,
-            TaskLockType.APPEND
-        )
-    );
+    announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId());
+    return pendingSegment;
   }
 
   private <T> T runAction(TaskAction<T> action)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
index 08e2c18..5945d14 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java
@@ -24,6 +24,7 @@
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -40,7 +41,7 @@
 /**
  * Test task that can be given a series of commands to execute in its {@link #runTask} method.
  */
-public class CommandQueueTask extends AbstractTask
+public class CommandQueueTask extends AbstractTask implements PendingSegmentAllocatingTask
 {
   private static final Logger log = new Logger(CommandQueueTask.class);
 
@@ -141,6 +142,12 @@
   }
 
   @Override
+  public String getTaskAllocatorId()
+  {
+    return getId();
+  }
+
+  @Override
   public TaskStatus runTask(TaskToolbox taskToolbox)
   {
     TaskStatus status;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index cc498c7..415c63a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -728,6 +728,7 @@
     // Append segment for Oct-Dec
     final DataSegment segmentV02 = asSegment(pendingSegment02);
     appendTask2.commitAppendSegments(segmentV02);
+    appendTask2.finishRunAndGetStatus();
     verifyIntervalHasUsedSegments(YEAR_23, segmentV02);
     verifyIntervalHasVisibleSegments(YEAR_23, segmentV02);
 
@@ -747,12 +748,14 @@
     // Append segment for Jan 1st
     final DataSegment segmentV01 = asSegment(pendingSegment01);
     appendTask.commitAppendSegments(segmentV01);
+    appendTask.finishRunAndGetStatus();
     verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02);
     verifyIntervalHasVisibleSegments(YEAR_23, segmentV01, segmentV02);
 
     // Replace segment for whole year
     final DataSegment segmentV10 = createSegment(YEAR_23, v1);
     replaceTask.commitReplaceSegments(segmentV10);
+    replaceTask.finishRunAndGetStatus();
 
     final DataSegment segmentV11 = DataSegment.builder(segmentV01)
                                               .version(v1)
@@ -767,6 +770,7 @@
     // Append segment for quarter
     final DataSegment segmentV03 = asSegment(pendingSegment03);
     appendTask3.commitAppendSegments(segmentV03);
+    appendTask3.finishRunAndGetStatus();
 
     final DataSegment segmentV13 = DataSegment.builder(segmentV03)
                                               .version(v1)
@@ -1021,7 +1025,7 @@
       @Override
       public TaskToolbox build(TaskConfig config, Task task)
       {
-        return createTaskToolbox(config, task);
+        return createTaskToolbox(config, task, null);
       }
     };
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
new file mode 100644
index 0000000..50c3186
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -0,0 +1,881 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.concurrent;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskStorageDirTracker;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.IngestionTestBase;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TestTaskToolboxFactory;
+import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
+import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Contains tests to verify behaviour of concurrently running REPLACE and APPEND
+ * tasks on the same interval of a datasource.
+ * <p>
+ * The tests verify the interleaving of the following actions:
+ * <ul>
+ *   <li>LOCK: Acquisition of a lock on an interval by a replace task</li>
+ *   <li>ALLOCATE: Allocation of a pending segment by an append task</li>
+ *   <li>REPLACE: Commit of segments created by a replace task</li>
+ *   <li>APPEND: Commit of segments created by an append task</li>
+ * </ul>
+ */
+public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
+{
+  /**
+   * The version used by append jobs when no previous replace job has run on an interval.
+   */
+  private static final String SEGMENT_V0 = DateTimes.EPOCH.toString();
+
+  private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
+  private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
+
+  private static final String WIKI = "wiki";
+
+  private TaskQueue taskQueue;
+  private TaskActionClientFactory taskActionClientFactory;
+  private TaskActionClient dummyTaskActionClient;
+  private final List<ActionsTestTask> runningTasks = new ArrayList<>();
+
+  private ActionsTestTask appendTask;
+  private ActionsTestTask replaceTask;
+
+  private final AtomicInteger groupId = new AtomicInteger(0);
+  private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class);
+  private Capture<String> supervisorId;
+  private Capture<SegmentIdWithShardSpec> oldPendingSegment;
+  private Capture<SegmentIdWithShardSpec> newPendingSegment;
+  private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
+  private Map<SegmentId, Object> parentSegmentToLoadSpec;
+
+  @Override
+  @Before
+  public void setUpIngestionTestBase() throws IOException
+  {
+    EasyMock.reset(supervisorManager);
+    EasyMock.expect(supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(WIKI))
+            .andReturn(Optional.of(WIKI)).anyTimes();
+    super.setUpIngestionTestBase();
+    final TaskConfig taskConfig = new TaskConfigBuilder().build();
+    taskActionClientFactory = createActionClientFactory();
+    dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create());
+
+    final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10);
+    TaskRunner taskRunner = new ThreadingTaskRunner(
+        createToolboxFactory(taskConfig, taskActionClientFactory),
+        taskConfig,
+        workerConfig,
+        new NoopTaskLogs(),
+        getObjectMapper(),
+        new TestAppenderatorsManager(),
+        new MultipleFileTaskReportFileWriter(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+    );
+    taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, new Period(0L), null, null, null),
+        new DefaultTaskConfig(),
+        getTaskStorage(),
+        taskRunner,
+        taskActionClientFactory,
+        getLockbox(),
+        new NoopServiceEmitter(),
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
+    );
+    runningTasks.clear();
+    taskQueue.start();
+
+    groupId.set(0);
+    appendTask = createAndStartTask();
+    supervisorId = Capture.newInstance(CaptureType.ALL);
+    oldPendingSegment = Capture.newInstance(CaptureType.ALL);
+    newPendingSegment = Capture.newInstance(CaptureType.ALL);
+    EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+        EasyMock.capture(supervisorId),
+        EasyMock.capture(oldPendingSegment),
+        EasyMock.capture(newPendingSegment)
+    )).andReturn(true).anyTimes();
+    replaceTask = createAndStartTask();
+    EasyMock.replay(supervisorManager);
+    versionToIntervalToLoadSpecs = new HashMap<>();
+    parentSegmentToLoadSpec = new HashMap<>();
+  }
+
+  @After
+  public void tearDown()
+  {
+    verifyVersionIntervalLoadSpecUniqueness();
+    for (ActionsTestTask task : runningTasks) {
+      task.finishRunAndGetStatus();
+    }
+  }
+
+  @Test
+  public void testLockReplaceAllocateAppend()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
+
+    final DataSegment segmentV11 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV11);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateAppendDayReplaceDay()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that the segment appended to v0 gets upgraded to v1
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .version(v1).build();
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateReplaceDayAppendDay()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    // Verify that the segment appended to v0 gets upgraded to v1
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .version(v1).build();
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateLockReplaceDayAppendDay()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    // Verify that the segment appended to v0 gets upgraded to v1
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .version(v1).build();
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateLockAppendDayReplaceDay()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+    replaceTask.finishRunAndGetStatus();
+
+    // Verify that the segment appended to v0 gets upgraded to v1
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .version(v1).build();
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateAppendDayLockReplaceDay()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that the segment appended to v0 gets fully overshadowed
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+  }
+
+  @Test
+  public void testLockReplaceMonthAllocateAppendDay()
+  {
+    String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    // Verify that the allocated segment takes the version and interval of previous replace
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(v1, pendingSegment.getVersion());
+
+    final DataSegment segmentV11 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV11);
+
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateAppendDayReplaceMonth()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that append segment gets upgraded to replace version
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .version(v1)
+                                              .interval(segmentV10.getInterval())
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateReplaceMonthAppendDay()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    // Verify that append segment gets upgraded to replace version
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .version(v1)
+                                              .interval(segmentV10.getInterval())
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateLockReplaceMonthAppendDay()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    // Verify that append segment gets upgraded to replace version
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .version(v1)
+                                              .interval(segmentV10.getInterval())
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateLockAppendDayReplaceMonth()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that append segment gets upgraded to replace version
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .version(v1)
+                                              .interval(segmentV10.getInterval())
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateAppendDayLockReplaceMonth()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that the old segment gets completely replaced
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
+  }
+
+  @Test
+  public void testLockReplaceDayAllocateAppendMonth()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    replaceTask.commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    // Verify that an APPEND lock cannot be acquired on month
+    TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+    Assert.assertNull(appendLock);
+
+    // Verify that new segment gets allocated with DAY granularity even though preferred was MONTH
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+    Assert.assertEquals(v1, pendingSegment.getVersion());
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+
+    final DataSegment segmentV11 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV11);
+
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateAppendMonthReplaceDay()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    // Verify that an APPEND lock cannot be acquired on month
+    TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+    Assert.assertNull(appendLock);
+
+    // Verify that the segment is allocated for DAY granularity
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    // Verify that append segment gets upgraded to replace version
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .version(v1)
+                                              .interval(segmentV10.getInterval())
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testLockAllocateReplaceDayAppendMonth()
+  {
+    final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
+
+    // Verify that an APPEND lock cannot be acquired on month
+    TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
+    Assert.assertNull(appendLock);
+
+    // Verify that the segment is allocated for DAY granularity instead of MONTH
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+    Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+
+    final DataSegment segmentV11 = DataSegment.builder(segmentV01)
+                                              .interval(FIRST_OF_JAN_23)
+                                              .version(v1)
+                                              .shardSpec(new NumberedShardSpec(1, 1))
+                                              .build();
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
+  }
+
+  @Test
+  public void testAllocateLockReplaceDayAppendMonth()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+    Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    // Verify that replace lock cannot be acquired on MONTH
+    TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+    Assert.assertNull(replaceLock);
+
+    // Verify that segment cannot be committed since there is no lock
+    final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0);
+    final ISE exception = Assert.assertThrows(ISE.class, () -> commitReplaceSegments(segmentV10));
+    final Throwable throwable = Throwables.getRootCause(exception);
+    Assert.assertEquals(
+        StringUtils.format(
+            "Segments[[%s]] are not covered by locks[[]] for task[%s]",
+            segmentV10, replaceTask.getId()
+        ),
+        throwable.getMessage()
+    );
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    commitAppendSegments(segmentV01);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
+  }
+
+  @Test
+  public void testAllocateAppendMonthLockReplaceDay()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
+    Assert.assertEquals(JAN_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
+
+    // Verify that replace lock cannot be acquired on DAY as MONTH is already locked
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+    Assert.assertNull(replaceLock);
+  }
+
+  @Test
+  public void testLockAllocateDayReplaceMonthAllocateAppend()
+  {
+    final SegmentIdWithShardSpec pendingSegmentV0
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+
+    final SegmentIdWithShardSpec pendingSegmentV1
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
+    Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV1.getVersion());
+
+    final DataSegment segmentV00 = asSegment(pendingSegmentV0);
+    final DataSegment segmentV11 = asSegment(pendingSegmentV1);
+    Set<DataSegment> appendSegments = commitAppendSegments(segmentV00, segmentV11)
+                                                .getSegments();
+
+    Assert.assertEquals(3, appendSegments.size());
+    // Segment V11 is committed
+    Assert.assertTrue(appendSegments.remove(segmentV11));
+    // Segment V00 is also committed
+    Assert.assertTrue(appendSegments.remove(segmentV00));
+    // Segment V00 is upgraded to v1 with MONTH granularlity at the time of commit as V12
+    final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
+    Assert.assertEquals(v1, segmentV12.getVersion());
+    Assert.assertEquals(JAN_23, segmentV12.getInterval());
+    Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
+
+    verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, segmentV12);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
+  }
+
+
+  @Nullable
+  private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
+  {
+    for (DataSegment segment : segments) {
+      if (version.equals(segment.getVersion())
+          && Objects.equals(segment.getLoadSpec(), loadSpec)) {
+        return segment;
+      }
+    }
+
+    return null;
+  }
+
+  private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
+  {
+    final SegmentId id = pendingSegment.asSegmentId();
+    return new DataSegment(
+        id,
+        Collections.singletonMap(id.toString(), id.toString()),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        pendingSegment.getShardSpec(),
+        null,
+        0,
+        0
+    );
+  }
+
+  private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments)
+  {
+    verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments);
+  }
+
+  private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments)
+  {
+    verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments);
+  }
+
+  private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments)
+  {
+    try {
+      Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
+          new RetrieveUsedSegmentsAction(
+              WIKI,
+              null,
+              ImmutableList.of(interval),
+              visibility
+          )
+      );
+      Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
+    }
+    catch (IOException e) {
+      throw new ISE(e, "Error while fetching used segments in interval[%s]", interval);
+    }
+  }
+
+  private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments)
+  {
+    try {
+      final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
+      Collection<DataSegment> allUsedSegments = taskActionClient.submit(
+          new RetrieveUsedSegmentsAction(
+              WIKI,
+              Collections.singletonList(interval)
+          )
+      );
+      Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
+    }
+    catch (IOException e) {
+      throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval);
+    }
+  }
+
+  private TaskToolboxFactory createToolboxFactory(
+      TaskConfig taskConfig,
+      TaskActionClientFactory taskActionClientFactory
+  )
+  {
+    TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder()
+        .setConfig(taskConfig)
+        .setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
+        .setTaskActionClientFactory(taskActionClientFactory);
+    return new TestTaskToolboxFactory(builder)
+    {
+      @Override
+      public TaskToolbox build(TaskConfig config, Task task)
+      {
+        return createTaskToolbox(config, task, supervisorManager);
+      }
+    };
+  }
+
+  private DataSegment createSegment(Interval interval, String version)
+  {
+    SegmentId id = SegmentId.of(WIKI, interval, version, null);
+    return DataSegment.builder()
+                      .dataSource(WIKI)
+                      .interval(interval)
+                      .version(version)
+                      .loadSpec(Collections.singletonMap(id.toString(), id.toString()))
+                      .size(100)
+                      .build();
+  }
+
+  private ActionsTestTask createAndStartTask()
+  {
+    ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory);
+    taskQueue.add(task);
+    runningTasks.add(task);
+    return task;
+  }
+
+  private void commitReplaceSegments(DataSegment... dataSegments)
+  {
+    replaceTask.commitReplaceSegments(dataSegments);
+    for (int i = 0; i < supervisorId.getValues().size(); i++) {
+      announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i));
+    }
+    supervisorId.reset();
+    oldPendingSegment.reset();
+    newPendingSegment.reset();
+    replaceTask.finishRunAndGetStatus();
+  }
+
+  private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments)
+  {
+    SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments);
+    result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
+    for (DataSegment segment : dataSegments) {
+      parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
+    }
+    appendTask.finishRunAndGetStatus();
+    return result;
+  }
+
+  private void announceUpgradedPendingSegment(
+      SegmentIdWithShardSpec oldPendingSegment,
+      SegmentIdWithShardSpec newPendingSegment
+  )
+  {
+    appendTask.getAnnouncedSegmentsToParentSegments()
+              .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId());
+  }
+
+  private void unannounceUpgradedPendingSegment(
+      DataSegment segment
+  )
+  {
+    appendTask.getAnnouncedSegmentsToParentSegments()
+              .remove(segment.getId());
+  }
+
+  private void verifyVersionIntervalLoadSpecUniqueness()
+  {
+    for (DataSegment usedSegment : getAllUsedSegments()) {
+      final String version = usedSegment.getVersion();
+      final Interval interval = usedSegment.getInterval();
+      final Object loadSpec = Iterables.getOnlyElement(usedSegment.getLoadSpec().values());
+      Map<Interval, Set<Object>> intervalToLoadSpecs
+          = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
+      Set<Object> loadSpecs
+          = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
+      Assert.assertFalse(loadSpecs.contains(loadSpec));
+      loadSpecs.add(loadSpec);
+    }
+
+    for (Map.Entry<SegmentId, SegmentId> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
+      final String version = entry.getKey().getVersion();
+      final Interval interval = entry.getKey().getInterval();
+      final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());
+      Map<Interval, Set<Object>> intervalToLoadSpecs
+          = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
+      Set<Object> loadSpecs
+          = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
+      Assert.assertFalse(loadSpecs.contains(loadSpec));
+      loadSpecs.add(loadSpec);
+    }
+  }
+
+  private Collection<DataSegment> getAllUsedSegments()
+  {
+    try {
+      return dummyTaskActionClient.submit(
+          new RetrieveUsedSegmentsAction(
+              WIKI,
+              null,
+              ImmutableList.of(Intervals.ETERNITY),
+              Segments.INCLUDING_OVERSHADOWED
+          )
+      );
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 307879f..999d4d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1896,14 +1896,17 @@
   }
 
   @Test
-  public void testUpgradeSegmentsCleanupOnUnlock()
+  public void testCleanupOnUnlock()
   {
-    final Task replaceTask = NoopTask.create();
-    final Task appendTask = NoopTask.create();
+    final Task replaceTask = NoopTask.forDatasource("replace");
+    final Task appendTask = NoopTask.forDatasource("append");
     final IndexerSQLMetadataStorageCoordinator coordinator
         = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
     // Only the replaceTask should attempt a delete on the upgradeSegments table
     EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
+    // Any task may attempt pending segment clean up
+    EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once();
+    EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once();
     EasyMock.replay(coordinator);
 
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 9ad3be6..f57494a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -30,6 +30,7 @@
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -175,7 +176,8 @@
   @Override
   public SegmentPublishResult commitAppendSegments(
       Set<DataSegment> appendSegments,
-      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      String taskGroup
   )
   {
     return SegmentPublishResult.ok(commitSegments(appendSegments));
@@ -186,7 +188,8 @@
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
       DataSourceMetadata startMetadata,
-      DataSourceMetadata endMetadata
+      DataSourceMetadata endMetadata,
+      String taskGroup
   )
   {
     return SegmentPublishResult.ok(commitSegments(appendSegments));
@@ -228,7 +231,8 @@
       Interval interval,
       PartialShardSpec partialShardSpec,
       String maxVersion,
-      boolean skipSegmentLineageCheck
+      boolean skipSegmentLineageCheck,
+      String taskAllocatorId
   )
   {
     return new SegmentIdWithShardSpec(
@@ -241,8 +245,7 @@
 
   @Override
   public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
-      Set<DataSegment> replaceSegments,
-      Set<String> activeBaseSequenceNames
+      Set<DataSegment> replaceSegments
   )
   {
     return Collections.emptyMap();
@@ -285,6 +288,18 @@
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public int deletePendingSegmentsForTaskGroup(final String taskGroup)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
+  {
+    throw new UnsupportedOperationException();
+  }
+
   public Set<DataSegment> getPublished()
   {
     return ImmutableSet.copyOf(published);
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index a889605..2390e7b 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord;
 
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -238,7 +239,7 @@
    *                                identifier may have a version lower than this one, but will not have one higher.
    * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
    *                                Should be set to false if replica tasks would index events in same order
-   *
+   * @param taskAllocatorId         The task allocator id with which the pending segment is associated
    * @return the pending segment identifier, or null if it was impossible to allocate a new segment
    */
   SegmentIdWithShardSpec allocatePendingSegment(
@@ -248,7 +249,8 @@
       Interval interval,
       PartialShardSpec partialShardSpec,
       String maxVersion,
-      boolean skipSegmentLineageCheck
+      boolean skipSegmentLineageCheck,
+      String taskAllocatorId
   );
 
   /**
@@ -322,10 +324,12 @@
    *                                   must be committed in a single transaction.
    * @param appendSegmentToReplaceLock Map from append segment to the currently
    *                                   active REPLACE lock (if any) covering it
+   * @param taskAllocatorId            allocator id of the task committing the segments to be appended
    */
   SegmentPublishResult commitAppendSegments(
       Set<DataSegment> appendSegments,
-      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      String taskAllocatorId
   );
 
   /**
@@ -340,7 +344,8 @@
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
       DataSourceMetadata startMetadata,
-      DataSourceMetadata endMetadata
+      DataSourceMetadata endMetadata,
+      String taskGroup
   );
 
   /**
@@ -373,13 +378,10 @@
    * </ul>
    *
    * @param replaceSegments Segments being committed by a REPLACE task
-   * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups
-   *                                       of the supervisor (if any) for this datasource
    * @return Map from originally allocated pending segment to its new upgraded ID.
    */
   Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
-      Set<DataSegment> replaceSegments,
-      Set<String> activeRealtimeSequencePrefixes
+      Set<DataSegment> replaceSegments
   );
 
   /**
@@ -476,4 +478,19 @@
    * @return number of deleted entries from the metadata store
    */
   int deleteUpgradeSegmentsForTask(String taskId);
+
+  /**
+   * Delete pending segment for a give task group after all the tasks belonging to it have completed.
+   * @param taskAllocatorId task id / task group / replica group for an appending task
+   * @return number of pending segments deleted from the metadata store
+   */
+  int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
+
+  /**
+   * Fetches all the pending segments of the datasource that overlap with a given interval.
+   * @param datasource datasource to be queried
+   * @param interval interval with which segments overlap
+   * @return List of pending segment records
+   */
+  List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval);
 }
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
index b43e46d..49b31e5 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
@@ -38,18 +38,24 @@
   private final String sequenceName;
   private final String previousSegmentId;
   private final PartialShardSpec partialShardSpec;
+  private final String upgradedFromSegmentId;
+  private final String taskAllocatorId;
 
   public SegmentCreateRequest(
       String sequenceName,
       String previousSegmentId,
       String version,
-      PartialShardSpec partialShardSpec
+      PartialShardSpec partialShardSpec,
+      String upgradedFromSegmentId,
+      String taskAllocatorId
   )
   {
     this.sequenceName = sequenceName;
     this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId;
     this.version = version;
     this.partialShardSpec = partialShardSpec;
+    this.upgradedFromSegmentId = upgradedFromSegmentId;
+    this.taskAllocatorId = taskAllocatorId;
   }
 
   public String getSequenceName()
@@ -75,4 +81,14 @@
   {
     return partialShardSpec;
   }
+
+  public String getUpgradedFromSegmentId()
+  {
+    return upgradedFromSegmentId;
+  }
+
+  public String getTaskAllocatorId()
+  {
+    return taskAllocatorId;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index d364299..e7567ba 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -57,7 +57,6 @@
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -94,7 +93,6 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -281,99 +279,74 @@
   }
 
   /**
-   * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter
-   * from the metadata store. Returns a Map from the pending segment ID to the sequence name.
+   * Fetches all the pending segments, whose interval overlaps with the given search interval, from the metadata store.
    */
   @VisibleForTesting
-  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
-      final Handle handle,
-      final String dataSource,
-      final Interval interval,
-      final Set<String> sequenceNamePrefixFilter
-  ) throws IOException
-  {
-    if (sequenceNamePrefixFilter.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter);
-    final List<String> sequenceNamePrefixConditions = new ArrayList<>();
-    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
-      sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i));
-    }
-
-    String sql = "SELECT sequence_name, payload"
-                 + " FROM " + dbTables.getPendingSegmentsTable()
-                 + " WHERE dataSource = :dataSource"
-                 + " AND start < :end"
-                 + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())
-                 + " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )";
-
-    Query<Map<String, Object>> query = handle.createQuery(sql)
-                                             .bind("dataSource", dataSource)
-                                             .bind("start", interval.getStart().toString())
-                                             .bind("end", interval.getEnd().toString());
-
-    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
-      query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%");
-    }
-
-    final ResultIterator<PendingSegmentsRecord> dbSegments =
-        query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
-             .iterator();
-
-    final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
-    while (dbSegments.hasNext()) {
-      PendingSegmentsRecord record = dbSegments.next();
-      final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
-      if (interval.overlaps(identifier.getInterval())) {
-        pendingSegmentToSequenceName.put(identifier, record.sequenceName);
-      }
-    }
-
-    dbSegments.close();
-
-    return pendingSegmentToSequenceName;
-  }
-
-  private Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+  List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
       final Interval interval
-  ) throws IOException
+  )
   {
-    final ResultIterator<PendingSegmentsRecord> dbSegments =
-        handle.createQuery(
-            StringUtils.format(
-                // This query might fail if the year has a different number of digits
-                // See https://github.com/apache/druid/pull/11582 for a similar issue
-                // Using long for these timestamps instead of varchar would give correct time comparisons
-                "SELECT sequence_name, payload FROM %1$s"
-                + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start",
-                dbTables.getPendingSegmentsTable(), connector.getQuoteString()
-            )
-        )
-              .bind("dataSource", dataSource)
-              .bind("start", interval.getStart().toString())
-              .bind("end", interval.getEnd().toString())
-              .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
-              .iterator();
+    final boolean compareIntervalEndpointsAsStrings = Intervals.canCompareEndpointsAsStrings(interval);
 
-    final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
-    while (dbSegments.hasNext()) {
-      PendingSegmentsRecord record = dbSegments.next();
-      final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
-      if (interval.overlaps(identifier.getInterval())) {
-        pendingSegmentToSequenceName.put(identifier, record.sequenceName);
-      }
+    String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+                 + " FROM " + dbTables.getPendingSegmentsTable()
+                 + " WHERE dataSource = :dataSource";
+    if (compareIntervalEndpointsAsStrings) {
+      sql = sql
+            + " AND start < :end"
+            + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString());
     }
 
-    dbSegments.close();
+    Query<Map<String, Object>> query = handle.createQuery(sql)
+                                             .bind("dataSource", dataSource);
+    if (compareIntervalEndpointsAsStrings) {
+      query = query.bind("start", interval.getStart().toString())
+                   .bind("end", interval.getEnd().toString());
+    }
 
-    return pendingSegmentToSequenceName;
+
+    final ResultIterator<PendingSegmentRecord> pendingSegmentIterator =
+        query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
+             .iterator();
+    final ImmutableList.Builder<PendingSegmentRecord> pendingSegments = ImmutableList.builder();
+    while (pendingSegmentIterator.hasNext()) {
+      final PendingSegmentRecord pendingSegment = pendingSegmentIterator.next();
+      if (compareIntervalEndpointsAsStrings || pendingSegment.getId().getInterval().overlaps(interval)) {
+        pendingSegments.add(pendingSegment);
+      }
+    }
+    pendingSegmentIterator.close();
+    return pendingSegments.build();
+  }
+
+  List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final String taskAllocatorId
+  )
+  {
+    String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+                 + " FROM " + dbTables.getPendingSegmentsTable()
+                 + " WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id";
+
+    Query<Map<String, Object>> query = handle.createQuery(sql)
+                                             .bind("dataSource", dataSource)
+                                             .bind("task_allocator_id", taskAllocatorId);
+
+    final ResultIterator<PendingSegmentRecord> pendingSegmentRecords =
+        query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
+             .iterator();
+
+    final List<PendingSegmentRecord> pendingSegments = new ArrayList<>();
+    while (pendingSegmentRecords.hasNext()) {
+      pendingSegments.add(pendingSegmentRecords.next());
+    }
+
+    pendingSegmentRecords.close();
+
+    return pendingSegments;
   }
 
   private SegmentTimeline getTimelineForIntervalsWithHandle(
@@ -503,9 +476,11 @@
             segmentsToInsert.addAll(
                 createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask)
             );
-            return SegmentPublishResult.ok(
+            SegmentPublishResult result = SegmentPublishResult.ok(
                 insertSegments(handle, segmentsToInsert)
             );
+            upgradePendingSegmentsOverlappingWith(segmentsToInsert);
+            return result;
           },
           3,
           getSqlMetadataMaxRetry()
@@ -519,14 +494,16 @@
   @Override
   public SegmentPublishResult commitAppendSegments(
       final Set<DataSegment> appendSegments,
-      final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+      final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      final String taskAllocatorId
   )
   {
     return commitAppendSegmentsAndMetadataInTransaction(
         appendSegments,
         appendSegmentToReplaceLock,
         null,
-        null
+        null,
+        taskAllocatorId
     );
   }
 
@@ -535,14 +512,16 @@
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
       DataSourceMetadata startMetadata,
-      DataSourceMetadata endMetadata
+      DataSourceMetadata endMetadata,
+      String taskAllocatorId
   )
   {
     return commitAppendSegmentsAndMetadataInTransaction(
         appendSegments,
         appendSegmentToReplaceLock,
         startMetadata,
-        endMetadata
+        endMetadata,
+        taskAllocatorId
     );
   }
 
@@ -645,7 +624,8 @@
       final Interval interval,
       final PartialShardSpec partialShardSpec,
       final String maxVersion,
-      final boolean skipSegmentLineageCheck
+      final boolean skipSegmentLineageCheck,
+      String taskAllocatorId
   )
   {
     Preconditions.checkNotNull(dataSource, "dataSource");
@@ -677,7 +657,8 @@
                 allocateInterval,
                 partialShardSpec,
                 maxVersion,
-                existingChunks
+                existingChunks,
+                taskAllocatorId
             );
           } else {
             return allocatePendingSegmentWithSegmentLineageCheck(
@@ -688,7 +669,8 @@
                 allocateInterval,
                 partialShardSpec,
                 maxVersion,
-                existingChunks
+                existingChunks,
+                taskAllocatorId
             );
           }
         }
@@ -697,8 +679,7 @@
 
   @Override
   public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
-      Set<DataSegment> replaceSegments,
-      Set<String> activeRealtimeSequencePrefixes
+      Set<DataSegment> replaceSegments
   )
   {
     if (replaceSegments.isEmpty()) {
@@ -717,7 +698,7 @@
 
     final String datasource = replaceSegments.iterator().next().getDataSource();
     return connector.retryWithHandle(
-        handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
+        handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId)
     );
   }
 
@@ -736,11 +717,10 @@
   private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(
       Handle handle,
       String datasource,
-      Map<Interval, DataSegment> replaceIntervalToMaxId,
-      Set<String> activeRealtimeSequencePrefixes
-  ) throws IOException
+      Map<Interval, DataSegment> replaceIntervalToMaxId
+  ) throws JsonProcessingException
   {
-    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> newPendingSegmentVersions = new HashMap<>();
+    final List<PendingSegmentRecord> upgradedPendingSegments = new ArrayList<>();
     final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> pendingSegmentToNewId = new HashMap<>();
 
     for (Map.Entry<Interval, DataSegment> entry : replaceIntervalToMaxId.entrySet()) {
@@ -751,15 +731,13 @@
       final int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions();
       int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum();
 
-      final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
-          = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes);
+      final List<PendingSegmentRecord> overlappingPendingSegments
+          = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval);
 
-      for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
-          : overlappingPendingSegments.entrySet()) {
-        final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
-        final String pendingSegmentSequence = overlappingPendingSegment.getValue();
+      for (PendingSegmentRecord overlappingPendingSegment : overlappingPendingSegments) {
+        final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getId();
 
-        if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) {
+        if (shouldUpgradePendingSegment(overlappingPendingSegment, replaceInterval, replaceVersion)) {
           // Ensure unique sequence_name_prev_id_sha1 by setting
           // sequence_prev_id -> pendingSegmentId
           // sequence_name -> prefix + replaceVersion
@@ -769,14 +747,14 @@
               replaceVersion,
               new NumberedShardSpec(++currentPartitionNumber, numCorePartitions)
           );
-          newPendingSegmentVersions.put(
-              new SegmentCreateRequest(
+          upgradedPendingSegments.add(
+              new PendingSegmentRecord(
+                  newId,
                   UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
                   pendingSegmentId.toString(),
-                  replaceVersion,
-                  NumberedPartialShardSpec.instance()
-              ),
-              newId
+                  pendingSegmentId.toString(),
+                  overlappingPendingSegment.getTaskAllocatorId()
+              )
           );
           pendingSegmentToNewId.put(pendingSegmentId, newId);
         }
@@ -787,33 +765,34 @@
     // includes hash of both sequence_name and prev_segment_id
     int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore(
         handle,
-        newPendingSegmentVersions,
+        upgradedPendingSegments,
         datasource,
         false
     );
     log.info(
         "Inserted total [%d] new versions for [%d] pending segments.",
-        numInsertedPendingSegments, newPendingSegmentVersions.size()
+        numInsertedPendingSegments, upgradedPendingSegments.size()
     );
 
     return pendingSegmentToNewId;
   }
 
   private boolean shouldUpgradePendingSegment(
-      SegmentIdWithShardSpec pendingSegmentId,
-      String pendingSegmentSequenceName,
+      PendingSegmentRecord pendingSegment,
       Interval replaceInterval,
       String replaceVersion
   )
   {
-    if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
+    if (pendingSegment.getTaskAllocatorId() == null) {
       return false;
-    } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) {
+    } else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
+      return false;
+    } else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
       return false;
     } else {
       // Do not upgrade already upgraded pending segment
-      return pendingSegmentSequenceName == null
-             || !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
+      return pendingSegment.getSequenceName() == null
+             || !pendingSegment.getSequenceName().startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
     }
   }
 
@@ -826,7 +805,8 @@
       final Interval interval,
       final PartialShardSpec partialShardSpec,
       final String maxVersion,
-      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
+      final String taskAllocatorId
   ) throws IOException
   {
     final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
@@ -896,7 +876,8 @@
         interval,
         previousSegmentIdNotNull,
         sequenceName,
-        sequenceNamePrevIdSha1
+        sequenceNamePrevIdSha1,
+        taskAllocatorId
     );
     return newIdentifier;
   }
@@ -947,7 +928,7 @@
     }
 
     // For each of the remaining requests, create a new segment
-    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = createNewSegments(
+    final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = createNewSegments(
         handle,
         dataSource,
         interval,
@@ -965,12 +946,14 @@
     // have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
     insertPendingSegmentsIntoMetastore(
         handle,
-        createdSegments,
+        ImmutableList.copyOf(createdSegments.values()),
         dataSource,
         skipSegmentLineageCheck
     );
 
-    allocatedSegmentIds.putAll(createdSegments);
+    for (Map.Entry<SegmentCreateRequest, PendingSegmentRecord> entry : createdSegments.entrySet()) {
+      allocatedSegmentIds.put(entry.getKey(), entry.getValue().getId());
+    }
     return allocatedSegmentIds;
   }
 
@@ -1009,7 +992,8 @@
       final Interval interval,
       final PartialShardSpec partialShardSpec,
       final String maxVersion,
-      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
+      final String taskAllocatorId
   ) throws IOException
   {
     final String sql = StringUtils.format(
@@ -1073,7 +1057,9 @@
     );
 
     // always insert empty previous sequence id
-    insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
+    insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1,
+                                      taskAllocatorId
+    );
 
     log.info(
         "Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].",
@@ -1281,7 +1267,8 @@
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
       @Nullable DataSourceMetadata startMetadata,
-      @Nullable DataSourceMetadata endMetadata
+      @Nullable DataSourceMetadata endMetadata,
+      String taskAllocatorId
   )
   {
     verifySegmentsToCommit(appendSegments);
@@ -1291,16 +1278,38 @@
     }
 
     final String dataSource = appendSegments.iterator().next().getDataSource();
-    final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction(
+    final List<PendingSegmentRecord> segmentIdsForNewVersions = connector.retryTransaction(
         (handle, transactionStatus)
-            -> createNewIdsForAppendSegments(handle, dataSource, appendSegments),
+            -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, dataSource, taskAllocatorId),
         0,
         SQLMetadataConnector.DEFAULT_MAX_TRIES
     );
 
+
     // Create entries for all required versions of the append segments
     final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
-    allSegmentsToInsert.addAll(segmentIdsForNewVersions);
+
+    final Map<String, DataSegment> segmentIdMap = new HashMap<>();
+    appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment));
+    segmentIdsForNewVersions.forEach(
+        pendingSegment -> {
+          if (segmentIdMap.containsKey(pendingSegment.getUpgradedFromSegmentId())) {
+            final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
+            allSegmentsToInsert.add(
+                new DataSegment(
+                    pendingSegment.getId().asSegmentId(),
+                    oldSegment.getLoadSpec(),
+                    oldSegment.getDimensions(),
+                    oldSegment.getMetrics(),
+                    pendingSegment.getId().getShardSpec(),
+                    oldSegment.getLastCompactionState(),
+                    oldSegment.getBinaryVersion(),
+                    oldSegment.getSize()
+                )
+            );
+          }
+        }
+    );
 
     final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
     try {
@@ -1341,31 +1350,27 @@
     }
   }
 
-  private int insertPendingSegmentsIntoMetastore(
+  @VisibleForTesting
+  int insertPendingSegmentsIntoMetastore(
       Handle handle,
-      Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
+      List<PendingSegmentRecord> pendingSegments,
       String dataSource,
       boolean skipSegmentLineageCheck
   ) throws JsonProcessingException
   {
     final PreparedBatch insertBatch = handle.prepareBatch(
         StringUtils.format(
-        "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
-        + "sequence_name_prev_id_sha1, payload) "
-        + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
-        + ":sequence_name_prev_id_sha1, :payload)",
-        dbTables.getPendingSegmentsTable(),
-        connector.getQuoteString()
-    ));
-
-    // Deduplicate the segment ids by inverting the map
-    Map<SegmentIdWithShardSpec, SegmentCreateRequest> segmentIdToRequest = new HashMap<>();
-    createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
+            "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+            + "sequence_name_prev_id_sha1, payload, task_allocator_id, upgraded_from_segment_id) "
+            + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+            + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id, :upgraded_from_segment_id)",
+            dbTables.getPendingSegmentsTable(),
+            connector.getQuoteString()
+        ));
 
     final String now = DateTimes.nowUtc().toString();
-    for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : segmentIdToRequest.entrySet()) {
-      final SegmentCreateRequest request = entry.getValue();
-      final SegmentIdWithShardSpec segmentId = entry.getKey();
+    for (PendingSegmentRecord pendingSegment : pendingSegments) {
+      final SegmentIdWithShardSpec segmentId = pendingSegment.getId();
       final Interval interval = segmentId.getInterval();
 
       insertBatch.add()
@@ -1374,13 +1379,15 @@
                  .bind("created_date", now)
                  .bind("start", interval.getStart().toString())
                  .bind("end", interval.getEnd().toString())
-                 .bind("sequence_name", request.getSequenceName())
-                 .bind("sequence_prev_id", request.getPreviousSegmentId())
+                 .bind("sequence_name", pendingSegment.getSequenceName())
+                 .bind("sequence_prev_id", pendingSegment.getSequencePrevId())
                  .bind(
                      "sequence_name_prev_id_sha1",
-                     getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck)
+                     pendingSegment.computeSequenceNamePrevIdSha1(skipSegmentLineageCheck)
                  )
-                 .bind("payload", jsonMapper.writeValueAsBytes(segmentId));
+                 .bind("payload", jsonMapper.writeValueAsBytes(segmentId))
+                 .bind("task_allocator_id", pendingSegment.getTaskAllocatorId())
+                 .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId());
     }
     int[] updated = insertBatch.execute();
     return Arrays.stream(updated).sum();
@@ -1393,15 +1400,16 @@
       Interval interval,
       String previousSegmentId,
       String sequenceName,
-      String sequenceNamePrevIdSha1
+      String sequenceNamePrevIdSha1,
+      String taskAllocatorId
   ) throws JsonProcessingException
   {
     handle.createStatement(
         StringUtils.format(
             "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
-            + "sequence_name_prev_id_sha1, payload) "
+            + "sequence_name_prev_id_sha1, payload, task_allocator_id) "
             + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
-            + ":sequence_name_prev_id_sha1, :payload)",
+            + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id)",
             dbTables.getPendingSegmentsTable(),
             connector.getQuoteString()
         )
@@ -1415,188 +1423,18 @@
           .bind("sequence_prev_id", previousSegmentId)
           .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
           .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
+          .bind("task_allocator_id", taskAllocatorId)
           .execute();
   }
 
-  /**
-   * Creates new IDs for the given append segments if a REPLACE task started and
-   * finished after these append segments had already been allocated. The newly
-   * created IDs belong to the same interval and version as the segments committed
-   * by the REPLACE task.
-   */
-  private Set<DataSegment> createNewIdsForAppendSegments(
-      Handle handle,
-      String dataSource,
-      Set<DataSegment> segmentsToAppend
-  ) throws IOException
-  {
-    if (segmentsToAppend.isEmpty()) {
-      return Collections.emptySet();
-    }
-
-    final Set<Interval> appendIntervals = new HashSet<>();
-    final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<>();
-    for (DataSegment segment : segmentsToAppend) {
-      appendIntervals.add(segment.getInterval());
-      appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
-                             .add(segment);
-    }
-
-    // Fetch all used segments that overlap with any of the append intervals
-    final Collection<DataSegment> overlappingSegments = retrieveUsedSegmentsForIntervals(
-        dataSource,
-        new ArrayList<>(appendIntervals),
-        Segments.INCLUDING_OVERSHADOWED
-    );
-
-    final Map<String, Set<Interval>> overlappingVersionToIntervals = new HashMap<>();
-    final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new HashMap<>();
-    for (DataSegment segment : overlappingSegments) {
-      overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
-                                   .add(segment.getInterval());
-      overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>())
-                                   .add(segment);
-    }
-
-    final Set<DataSegment> upgradedSegments = new HashSet<>();
-    for (Map.Entry<String, Set<Interval>> entry : overlappingVersionToIntervals.entrySet()) {
-      final String upgradeVersion = entry.getKey();
-      Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
-          upgradeVersion,
-          entry.getValue(),
-          appendVersionToSegments
-      );
-      for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
-        final Interval upgradeInterval = upgradeEntry.getKey();
-        final Set<DataSegment> segmentsAlreadyOnVersion
-            = overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet())
-                                           .stream()
-                                           .filter(s -> s.getVersion().equals(upgradeVersion))
-                                           .collect(Collectors.toSet());
-        Set<DataSegment> segmentsUpgradedToVersion = createNewIdsForAppendSegmentsWithVersion(
-            handle,
-            upgradeVersion,
-            upgradeInterval,
-            upgradeEntry.getValue(),
-            segmentsAlreadyOnVersion
-        );
-        log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion);
-        upgradedSegments.addAll(segmentsUpgradedToVersion);
-      }
-    }
-
-    return upgradedSegments;
-  }
-
-  /**
-   * Creates a Map from eligible interval to Set of segments that are fully
-   * contained in that interval and have a version strictly lower than {@code #cutoffVersion}.
-   */
-  private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
-      String cutoffVersion,
-      Set<Interval> eligibleIntervals,
-      TreeMap<String, Set<DataSegment>> versionToSegments
-  )
-  {
-    final Set<DataSegment> eligibleSegments
-        = versionToSegments.headMap(cutoffVersion).values().stream()
-                           .flatMap(Collection::stream)
-                           .collect(Collectors.toSet());
-
-    final Map<Interval, Set<DataSegment>> eligibleIntervalToSegments = new HashMap<>();
-
-    for (DataSegment segment : eligibleSegments) {
-      final Interval segmentInterval = segment.getInterval();
-      for (Interval eligibleInterval : eligibleIntervals) {
-        if (eligibleInterval.contains(segmentInterval)) {
-          eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet<>())
-                                    .add(segment);
-          break;
-        } else if (eligibleInterval.overlaps(segmentInterval)) {
-          // Committed interval overlaps only partially
-          throw new ISE(
-              "Committed interval[%s] conflicts with interval[%s] of append segment[%s].",
-              eligibleInterval, segmentInterval, segment.getId()
-          );
-        }
-      }
-    }
-
-    return eligibleIntervalToSegments;
-  }
-
-  /**
-   * Computes new segment IDs that belong to the upgradeInterval and upgradeVersion.
-   *
-   * @param committedSegments Segments that already exist in the upgradeInterval
-   *                          at upgradeVersion.
-   */
-  private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
-      Handle handle,
-      String upgradeVersion,
-      Interval upgradeInterval,
-      Set<DataSegment> segmentsToUpgrade,
-      Set<DataSegment> committedSegments
-  ) throws IOException
-  {
-    // Find the committed segments with the higest partition number
-    SegmentIdWithShardSpec committedMaxId = null;
-    for (DataSegment committedSegment : committedSegments) {
-      if (committedMaxId == null
-          || committedMaxId.getShardSpec().getPartitionNum() < committedSegment.getShardSpec().getPartitionNum()) {
-        committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment);
-      }
-    }
-
-    // Get pending segments for the new version to determine the next partition number to allocate
-    final String dataSource = segmentsToUpgrade.iterator().next().getDataSource();
-    final Set<SegmentIdWithShardSpec> pendingSegmentIds
-        = getPendingSegmentsForIntervalWithHandle(handle, dataSource, upgradeInterval).keySet();
-    final Set<SegmentIdWithShardSpec> allAllocatedIds = new HashSet<>(pendingSegmentIds);
-
-    // Create new IDs for each append segment
-    final Set<DataSegment> newSegmentIds = new HashSet<>();
-    for (DataSegment segment : segmentsToUpgrade) {
-      SegmentCreateRequest request = new SegmentCreateRequest(
-          segment.getId() + "__" + upgradeVersion,
-          null,
-          upgradeVersion,
-          NumberedPartialShardSpec.instance()
-      );
-
-      // Create new segment ID based on committed segments, allocated pending segments
-      // and new IDs created so far in this method
-      final SegmentIdWithShardSpec newId = createNewSegment(
-          request,
-          dataSource,
-          upgradeInterval,
-          upgradeVersion,
-          committedMaxId,
-          allAllocatedIds
-      );
-
-      // Update the set so that subsequent segment IDs use a higher partition number
-      allAllocatedIds.add(newId);
-      newSegmentIds.add(
-          DataSegment.builder(segment)
-                     .interval(newId.getInterval())
-                     .version(newId.getVersion())
-                     .shardSpec(newId.getShardSpec())
-                     .build()
-      );
-    }
-
-    return newSegmentIds;
-  }
-
-  private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+  private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
       Handle handle,
       String dataSource,
       Interval interval,
       boolean skipSegmentLineageCheck,
       List<TimelineObjectHolder<String, DataSegment>> existingChunks,
       List<SegmentCreateRequest> requests
-  ) throws IOException
+  )
   {
     if (requests.isEmpty()) {
       return Collections.emptyMap();
@@ -1637,18 +1475,21 @@
     // across all shard specs (published + pending).
     // A pending segment having a higher partitionId must also be considered
     // to avoid clashes when inserting the pending segment created here.
-    final Set<SegmentIdWithShardSpec> pendingSegments =
-        new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
+    final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
+                                                                             .map(PendingSegmentRecord::getId)
+                                                                             .collect(Collectors.toSet())
+    );
 
-    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>();
-    final Map<UniqueAllocateRequest, SegmentIdWithShardSpec> uniqueRequestToSegment = new HashMap<>();
+    final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = new HashMap<>();
+    final Map<UniqueAllocateRequest, PendingSegmentRecord> uniqueRequestToSegment = new HashMap<>();
 
     for (SegmentCreateRequest request : requests) {
       // Check if the required segment has already been created in this batch
       final UniqueAllocateRequest uniqueRequest =
           new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck);
 
-      final SegmentIdWithShardSpec createdSegment;
+      final PendingSegmentRecord createdSegment;
       if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
         createdSegment = uniqueRequestToSegment.get(uniqueRequest);
       } else {
@@ -1663,9 +1504,9 @@
 
         // Add to pendingSegments to consider for partitionId
         if (createdSegment != null) {
-          pendingSegments.add(createdSegment);
+          pendingSegments.add(createdSegment.getId());
           uniqueRequestToSegment.put(uniqueRequest, createdSegment);
-          log.info("Created new segment[%s]", createdSegment);
+          log.info("Created new segment[%s]", createdSegment.getId());
         }
       }
 
@@ -1678,7 +1519,7 @@
     return createdSegments;
   }
 
-  private SegmentIdWithShardSpec createNewSegment(
+  private PendingSegmentRecord createNewSegment(
       SegmentCreateRequest request,
       String dataSource,
       Interval interval,
@@ -1731,12 +1572,19 @@
                                  : PartitionIds.ROOT_GEN_START_PARTITION_ID;
 
       String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
-      return new SegmentIdWithShardSpec(
+      SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
           version,
           partialShardSpec.complete(jsonMapper, newPartitionId, 0)
       );
+      return new PendingSegmentRecord(
+          pendingSegmentId,
+          request.getSequenceName(),
+          request.getPreviousSegmentId(),
+          request.getUpgradedFromSegmentId(),
+          request.getTaskAllocatorId()
+      );
 
     } else if (!overallMaxId.getInterval().equals(interval)) {
       log.warn(
@@ -1761,7 +1609,7 @@
       // When the core partitions have been dropped, using pending segments may lead to an incorrect state
       // where the chunk is believed to have core partitions and queries results are incorrect.
 
-      return new SegmentIdWithShardSpec(
+      SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
           Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@@ -1771,6 +1619,13 @@
               committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
           )
       );
+      return new PendingSegmentRecord(
+          pendingSegmentId,
+          request.getSequenceName(),
+          request.getPreviousSegmentId(),
+          request.getUpgradedFromSegmentId(),
+          request.getTaskAllocatorId()
+      );
     }
   }
 
@@ -1796,7 +1651,7 @@
       final PartialShardSpec partialShardSpec,
       final String existingVersion,
       final List<TimelineObjectHolder<String, DataSegment>> existingChunks
-  ) throws IOException
+  )
   {
     // max partitionId of published data segments which share the same partition space.
     SegmentIdWithShardSpec committedMaxId = null;
@@ -1830,7 +1685,9 @@
     // A pending segment having a higher partitionId must also be considered
     // to avoid clashes when inserting the pending segment created here.
     final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
-        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
+                                                                             .map(PendingSegmentRecord::getId)
+                                                                             .collect(Collectors.toSet())
     );
     if (committedMaxId != null) {
       pendings.add(committedMaxId);
@@ -2689,6 +2546,30 @@
   }
 
   @Override
+  public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup)
+  {
+    return connector.getDBI().inTransaction(
+        (handle, status) -> handle
+            .createStatement(
+                StringUtils.format(
+                    "DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
+                    dbTables.getPendingSegmentsTable()
+                )
+            )
+            .bind("task_allocator_id", pendingSegmentsGroup)
+            .execute()
+    );
+  }
+
+  @Override
+  public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
+  {
+    return connector.retryWithHandle(
+        handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, interval)
+    );
+  }
+
+  @Override
   public int deleteUpgradeSegmentsForTask(final String taskId)
   {
     return connector.getDBI().inTransaction(
diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
new file mode 100644
index 0000000..44c62bf
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+/**
+ * Representation of a record in the pending segments table. <br/>
+ * Mapping of column in table to field:
+ *
+ * <ul>
+ * <li>  id -> id (Unique identifier for pending segment) <li/>
+ * <li>  sequence_name -> sequenceName (sequence name used for segment allocation) <li/>
+ * <li>  sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation) <li/>
+ * <li>  upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded) <li/>
+ * <li>  task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment) <li/>
+ * </ul>
+ */
+public class PendingSegmentRecord
+{
+  private final SegmentIdWithShardSpec id;
+  private final String sequenceName;
+  private final String sequencePrevId;
+  private final String upgradedFromSegmentId;
+  private final String taskAllocatorId;
+
+  public PendingSegmentRecord(
+      SegmentIdWithShardSpec id,
+      String sequenceName,
+      String sequencePrevId,
+      @Nullable String upgradedFromSegmentId,
+      @Nullable String taskAllocatorId
+  )
+  {
+    this.id = id;
+    this.sequenceName = sequenceName;
+    this.sequencePrevId = sequencePrevId;
+    this.upgradedFromSegmentId = upgradedFromSegmentId;
+    this.taskAllocatorId = taskAllocatorId;
+  }
+
+  public SegmentIdWithShardSpec getId()
+  {
+    return id;
+  }
+
+  public String getSequenceName()
+  {
+    return sequenceName;
+  }
+
+  public String getSequencePrevId()
+  {
+    return sequencePrevId;
+  }
+
+  /**
+   * The original pending segment using which this upgraded segment was created.
+   * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded.
+   */
+  @Nullable
+  public String getUpgradedFromSegmentId()
+  {
+    return upgradedFromSegmentId;
+  }
+
+  /**
+   * task / taskGroup / replica group of task that allocated this segment.
+   * Can be null for pending segments allocated before this column was added.
+   */
+  @Nullable
+  public String getTaskAllocatorId()
+  {
+    return taskAllocatorId;
+  }
+
+  @SuppressWarnings("UnstableApiUsage")
+  public String computeSequenceNamePrevIdSha1(boolean skipSegmentLineageCheck)
+  {
+    final Hasher hasher = Hashing.sha1().newHasher()
+                                 .putBytes(StringUtils.toUtf8(getSequenceName()))
+                                 .putByte((byte) 0xff);
+
+    if (skipSegmentLineageCheck) {
+      final Interval interval = getId().getInterval();
+      hasher
+          .putLong(interval.getStartMillis())
+          .putLong(interval.getEndMillis());
+    } else {
+      hasher
+          .putBytes(StringUtils.toUtf8(getSequencePrevId()));
+    }
+
+    hasher.putByte((byte) 0xff);
+    hasher.putBytes(StringUtils.toUtf8(getId().getVersion()));
+
+    return BaseEncoding.base16().encode(hasher.hash().asBytes());
+  }
+
+  public static PendingSegmentRecord fromResultSet(ResultSet resultSet, ObjectMapper jsonMapper)
+  {
+    try {
+      final byte[] payload = resultSet.getBytes("payload");
+      return new PendingSegmentRecord(
+          jsonMapper.readValue(payload, SegmentIdWithShardSpec.class),
+          resultSet.getString("sequence_name"),
+          resultSet.getString("sequence_prev_id"),
+          resultSet.getString("upgraded_from_segment_id"),
+          resultSet.getString("task_allocator_id")
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 6feaf9e..99d69f5 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -288,6 +288,7 @@
             )
         )
     );
+    alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
   }
 
   public void createDataSourceTable(final String tableName)
@@ -460,6 +461,26 @@
     }
   }
 
+  private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
+  {
+    List<String> statements = new ArrayList<>();
+    if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
+      log.info("Table[%s] already has column[upgraded_from_segment_id].", tableName);
+    } else {
+      log.info("Adding column[upgraded_from_segment_id] to table[%s].", tableName);
+      statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN upgraded_from_segment_id VARCHAR(255)", tableName));
+    }
+    if (tableHasColumn(tableName, "task_allocator_id")) {
+      log.info("Table[%s] already has column[task_allocator_id].", tableName);
+    } else {
+      log.info("Adding column[task_allocator_id] to table[%s].", tableName);
+      statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_allocator_id VARCHAR(255)", tableName));
+    }
+    if (!statements.isEmpty()) {
+      alterTable(tableName, statements);
+    }
+  }
+
   public void createLogTable(final String tableName, final String entryTypeName)
   {
     createTable(
diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
index 33641a8..57e01d7 100644
--- a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
@@ -35,7 +35,9 @@
         "sequence",
         null,
         "version",
-        partialShardSpec
+        partialShardSpec,
+        null,
+        null
     );
     Assert.assertEquals("sequence", request.getSequenceName());
     Assert.assertEquals("", request.getPreviousSegmentId());
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 7b6fb4d..95ec1dd 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -25,8 +25,6 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.hash.Hashing;
-import com.google.common.io.BaseEncoding;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -471,44 +469,6 @@
     );
   }
 
-  private Boolean insertPendingSegmentAndSequenceName(Pair<SegmentIdWithShardSpec, String> pendingSegmentSequenceName)
-  {
-    final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs;
-    final String sequenceName = pendingSegmentSequenceName.rhs;
-    final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
-    return derbyConnector.retryWithHandle(
-        handle -> {
-          handle.createStatement(
-                    StringUtils.format(
-                        "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
-                        + "sequence_name_prev_id_sha1, payload) "
-                        + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
-                        + ":sequence_name_prev_id_sha1, :payload)",
-                        table,
-                        derbyConnector.getQuoteString()
-                    )
-                )
-                .bind("id", pendingSegment.toString())
-                .bind("dataSource", pendingSegment.getDataSource())
-                .bind("created_date", DateTimes.nowUtc().toString())
-                .bind("start", pendingSegment.getInterval().getStart().toString())
-                .bind("end", pendingSegment.getInterval().getEnd().toString())
-                .bind("sequence_name", sequenceName)
-                .bind("sequence_prev_id", pendingSegment.toString())
-                .bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode(
-                    Hashing.sha1()
-                           .newHasher()
-                           .putLong((long) pendingSegment.hashCode() * sequenceName.hashCode())
-                           .hash()
-                           .asBytes()
-                ))
-                .bind("payload", mapper.writeValueAsBytes(pendingSegment))
-                .execute();
-          return true;
-        }
-    );
-  }
-
   private Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskId)
   {
     final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
@@ -620,7 +580,7 @@
 
     // Commit the segment and verify the results
     SegmentPublishResult commitResult
-        = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock);
+        = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append");
     Assert.assertTrue(commitResult.isSuccess());
     Assert.assertEquals(appendSegments, commitResult.getSegments());
 
@@ -649,6 +609,30 @@
     final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
     final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
     final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
+    final PendingSegmentRecord pendingSegmentInInterval = new PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "foo",
+            Intervals.of("2023-01-01/2023-01-02"),
+            "2023-01-02",
+            new NumberedShardSpec(100, 0)
+        ),
+        "",
+        "",
+        null,
+        "append"
+    );
+    final PendingSegmentRecord pendingSegmentOutsideInterval = new PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "foo",
+            Intervals.of("2023-04-01/2023-04-02"),
+            "2023-01-02",
+            new NumberedShardSpec(100, 0)
+        ),
+        "",
+        "",
+        null,
+        "append"
+    );
     for (int i = 1; i < 9; i++) {
       final DataSegment segment = new DataSegment(
           "foo",
@@ -665,6 +649,14 @@
       appendedSegmentToReplaceLockMap.put(segment, replaceLock);
     }
     insertUsedSegments(segmentsAppendedWithReplaceLock);
+    derbyConnector.retryWithHandle(
+        handle -> coordinator.insertPendingSegmentsIntoMetastore(
+            handle,
+            ImmutableList.of(pendingSegmentInInterval, pendingSegmentOutsideInterval),
+            "foo",
+            true
+        )
+    );
     insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap);
 
     final Set<DataSegment> replacingSegments = new HashSet<>();
@@ -709,6 +701,25 @@
       }
       Assert.assertTrue(hasBeenCarriedForward);
     }
+
+    List<PendingSegmentRecord> pendingSegmentsInInterval =
+        coordinator.getPendingSegments("foo", Intervals.of("2023-01-01/2023-02-01"));
+    Assert.assertEquals(2, pendingSegmentsInInterval.size());
+    final SegmentId rootPendingSegmentId = pendingSegmentInInterval.getId().asSegmentId();
+    if (pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId() == null) {
+      Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(0).getId().asSegmentId());
+      Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(1).getUpgradedFromSegmentId());
+    } else {
+      Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(1).getId().asSegmentId());
+      Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId());
+    }
+
+    List<PendingSegmentRecord> pendingSegmentsOutsideInterval =
+        coordinator.getPendingSegments("foo", Intervals.of("2023-04-01/2023-05-01"));
+    Assert.assertEquals(1, pendingSegmentsOutsideInterval.size());
+    Assert.assertEquals(
+        pendingSegmentOutsideInterval.getId().asSegmentId(), pendingSegmentsOutsideInterval.get(0).getId().asSegmentId()
+    );
   }
 
   @Test
@@ -2416,7 +2427,8 @@
         interval,
         partialShardSpec,
         "version",
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
@@ -2428,7 +2440,8 @@
         interval,
         partialShardSpec,
         identifier.getVersion(),
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
@@ -2440,7 +2453,8 @@
         interval,
         partialShardSpec,
         identifier1.getVersion(),
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
@@ -2452,7 +2466,8 @@
         interval,
         partialShardSpec,
         identifier1.getVersion(),
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString());
@@ -2465,7 +2480,8 @@
         interval,
         partialShardSpec,
         "version",
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString());
@@ -2501,7 +2517,8 @@
         interval,
         partialShardSpec,
         "version",
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
     // Since there are no used core partitions yet
@@ -2515,7 +2532,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
     // Since there are no used core partitions yet
@@ -2529,7 +2547,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
     // Since there are no used core partitions yet
@@ -2559,7 +2578,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
     // Used segment set has 1 core partition
@@ -2577,7 +2597,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
     // Since all core partitions have been dropped
@@ -2610,7 +2631,8 @@
         interval,
         partialShardSpec,
         "A",
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString());
     // Assume it publishes; create its corresponding segment
@@ -2638,7 +2660,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString());
     // Assume it publishes; create its corresponding segment
@@ -2666,7 +2689,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString());
     // Assume it publishes; create its corresponding segment
@@ -2720,7 +2744,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString());
     // no corresponding segment, pending aborted
@@ -2754,7 +2779,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     // maxid = B_1 -> new partno = 2
     // versionofexistingchunk=A
@@ -2791,7 +2817,7 @@
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
     final String sequenceName = "seq";
 
-    final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec);
+    final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
     final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2802,7 +2828,7 @@
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
 
     final SegmentCreateRequest request1 =
-        new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec);
+        new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null);
     final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2813,7 +2839,7 @@
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
 
     final SegmentCreateRequest request2 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
     final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2824,7 +2850,7 @@
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
 
     final SegmentCreateRequest request3 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
     final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2836,7 +2862,7 @@
     Assert.assertEquals(segmentId2, segmentId3);
 
     final SegmentCreateRequest request4 =
-        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec);
+        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null);
     final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2880,7 +2906,8 @@
         interval,
         partialShardSpec,
         maxVersion,
-        true
+        true,
+        null
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString());
 
@@ -2905,7 +2932,8 @@
           interval,
           partialShardSpec,
           "version",
-          false
+          false,
+          null
       );
       prevSegmentId = identifier.toString();
     }
@@ -2920,7 +2948,8 @@
           interval,
           partialShardSpec,
           "version",
-          false
+          false,
+          null
       );
       prevSegmentId = identifier.toString();
     }
@@ -2947,7 +2976,8 @@
           interval,
           new NumberedOverwritePartialShardSpec(0, 1, (short) (i + 1)),
           "version",
-          false
+          false,
+          null
       );
       Assert.assertEquals(
           StringUtils.format(
@@ -3015,7 +3045,8 @@
         interval,
         partialShardSpec,
         "version",
-        true
+        true,
+        null
     );
 
     HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3046,7 +3077,8 @@
         interval,
         partialShardSpec,
         "version",
-        true
+        true,
+        null
     );
 
     shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3077,7 +3109,8 @@
         interval,
         new HashBasedNumberedPartialShardSpec(null, 2, 3, null),
         "version",
-        true
+        true,
+        null
     );
 
     shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@@ -3124,7 +3157,8 @@
         interval,
         NumberedPartialShardSpec.instance(),
         version,
-        false
+        false,
+        null
     );
     Assert.assertNull(id);
   }
@@ -3169,7 +3203,8 @@
         interval,
         NumberedPartialShardSpec.instance(),
         version,
-        false
+        false,
+        null
     );
     Assert.assertNull(id);
   }
@@ -3330,64 +3365,6 @@
   }
 
   @Test
-  public void testGetPendingSegmentsForIntervalWithSequencePrefixes()
-  {
-    Pair<SegmentIdWithShardSpec, String> validIntervalValidSequence = Pair.of(
-        SegmentIdWithShardSpec.fromDataSegment(defaultSegment),
-        "validLOL"
-    );
-    insertPendingSegmentAndSequenceName(validIntervalValidSequence);
-
-    Pair<SegmentIdWithShardSpec, String> validIntervalInvalidSequence = Pair.of(
-        SegmentIdWithShardSpec.fromDataSegment(defaultSegment2),
-        "invalidRandom"
-    );
-    insertPendingSegmentAndSequenceName(validIntervalInvalidSequence);
-
-    Pair<SegmentIdWithShardSpec, String> invalidIntervalvalidSequence = Pair.of(
-        SegmentIdWithShardSpec.fromDataSegment(existingSegment1),
-        "validStuff"
-    );
-    insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence);
-
-    Pair<SegmentIdWithShardSpec, String> twentyFifteenWithAnotherValidSequence = Pair.of(
-        new SegmentIdWithShardSpec(
-            existingSegment1.getDataSource(),
-            Intervals.of("2015/2016"),
-            "1970-01-01",
-            new NumberedShardSpec(1, 0)
-        ),
-        "alsoValidAgain"
-    );
-    insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence);
-
-    Pair<SegmentIdWithShardSpec, String> twentyFifteenWithInvalidSequence = Pair.of(
-        new SegmentIdWithShardSpec(
-            existingSegment1.getDataSource(),
-            Intervals.of("2015/2016"),
-            "1970-01-01",
-            new NumberedShardSpec(2, 0)
-        ),
-        "definitelyInvalid"
-    );
-    insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence);
-
-
-    final Map<SegmentIdWithShardSpec, String> expected = new HashMap<>();
-    expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs);
-    expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs);
-
-    final Map<SegmentIdWithShardSpec, String> actual =
-        derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle(
-            handle,
-            defaultSegment.getDataSource(),
-            defaultSegment.getInterval(),
-            ImmutableSet.of("valid", "alsoValid")
-        ));
-    Assert.assertEquals(expected, actual);
-  }
-
-  @Test
   public void testRetrieveUsedSegmentsAndCreatedDates()
   {
     insertUsedSegments(ImmutableSet.of(defaultSegment));
@@ -3471,7 +3448,8 @@
         interval,
         NumberedPartialShardSpec.instance(),
         "version",
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
@@ -3525,7 +3503,8 @@
         interval,
         NumberedPartialShardSpec.instance(),
         "version",
-        false
+        false,
+        null
     );
 
     Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());