Fix race conditions between segment merge/roll-up and purge (or convertToRawIndex) tasks: (#7427)

1. Add REFRESH_ONLY header for purge and convertToRawIndex tasks, segment upload api will abort the request if the segment does not exist or is deleted before the upload request is completed.
2. Honor segment lineage for convertToRawIndexTaskGenerator
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
index cd41c6e..b9b4092 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
@@ -44,7 +44,7 @@
   @Override
   public Set<String> preSelect(Set<String> onlineSegments) {
     SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, _tableNameWithType);
-    SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(onlineSegments, segmentLineage);
+    SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(onlineSegments, segmentLineage);
     return onlineSegments;
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 9d667f2..6464e46 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -65,6 +65,7 @@
   private static final int NUM_BROKERS = 3;
   private static final int NUM_SERVERS = 1;
   private static final int NUM_OFFLINE_SEGMENTS = 5;
+  private static final int EXPECTED_VERSION = -1;
 
   private HelixBrokerStarter _brokerStarter;
 
@@ -215,7 +216,7 @@
         _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh);
     _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
         SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
-        segmentZKMetadata, "downloadUrl", null);
+        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
 
     TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
         .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
index 0a99509..458b247 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -43,7 +43,7 @@
    * Use the segment lineage metadata to filters out either merged segments or original segments in place
    * to make sure that the final segments contain no duplicate data.
    */
-  public static void filterSegmentsBasedOnLineageInplace(Set<String> segments, SegmentLineage segmentLineage) {
+  public static void filterSegmentsBasedOnLineageInPlace(Set<String> segments, SegmentLineage segmentLineage) {
     if (segmentLineage != null) {
       for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
         LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 0e3ce23..bcd71ae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -85,6 +85,7 @@
 
   public static class CustomHeaders {
     public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
+    public static final String REFRESH_ONLY = "REFRESH_ONLY";
     public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
     public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
     public static final String CRYPTER = "CRYPTER";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 9066ff5..f2c6c45 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -23,6 +23,8 @@
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -414,6 +416,11 @@
   @Path("/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
   // request if a multipart object is not sent. This endpoint does not move the segment to its final location;
   // it keeps it at the downloadURI header that is set. We will not support this endpoint going forward.
@@ -442,6 +449,11 @@
   @Path("/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint.
   public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
@@ -468,6 +480,11 @@
   @Path("/v2/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
   // request if a multipart object is not sent. This endpoint is recommended for use. It differs from the first
   // endpoint in how it moves the segment to a Pinot-determined final directory.
@@ -496,6 +513,11 @@
   @Path("/v2/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // This behavior does not differ from v1 of the same endpoint.
   public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 9d6bf01..b3a7258 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -65,7 +65,14 @@
     String segmentName = segmentMetadata.getName();
     ZNRecord segmentMetadataZNRecord =
         _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+    boolean refreshOnly =
+        Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
     if (segmentMetadataZNRecord == null) {
+      if (refreshOnly) {
+        throw new ControllerApplicationException(LOGGER,
+            "Cannot refresh non-existing segment, aborted uploading segment: " + segmentName + " of table: "
+                + tableNameWithType, Response.Status.GONE);
+      }
       LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers,
           crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation);
@@ -93,6 +100,7 @@
 
     SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord);
     long existingCrc = existingSegmentZKMetadata.getCrc();
+    int expectedVersion = znRecord.getVersion();
 
     // Check if CRC match when IF-MATCH header is set
     checkCRC(headers, tableNameWithType, segmentName, existingCrc);
@@ -118,10 +126,13 @@
       // Lock the segment by setting the upload start time in ZK
       existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
       if (!_pinotHelixResourceManager
-          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, znRecord.getVersion())) {
+          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
         throw new ControllerApplicationException(LOGGER,
             "Failed to lock the segment: " + segmentName + " of table: " + tableNameWithType + ", retry later",
             Response.Status.CONFLICT);
+      } else {
+        // The version will increment if the zk metadata update is successful
+        expectedVersion++;
       }
     }
 
@@ -156,7 +167,10 @@
         // (creation time is not included in the crc)
         existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
         existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata)) {
+        // NOTE: in rare cases the segment can be deleted before the metadata is updated and the expected version won't
+        // match, we should fail the request for such cases
+        if (!_pinotHelixResourceManager
+            .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
           throw new RuntimeException(
               "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
         }
@@ -175,10 +189,12 @@
         }
 
         _pinotHelixResourceManager
-            .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, zkDownloadURI, crypter);
+            .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion,
+                zkDownloadURI, crypter);
       }
     } catch (Exception e) {
-      if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata)) {
+      if (!_pinotHelixResourceManager
+          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
         LOGGER.error("Failed to update ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
       }
       throw e;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cbc2651..25797c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1788,7 +1788,7 @@
   }
 
   public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
-      SegmentZKMetadata segmentZKMetadata, String downloadUrl, @Nullable String crypter) {
+      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
 
     // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on segment
@@ -1800,7 +1800,8 @@
     segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
     segmentZKMetadata.setDownloadUrl(downloadUrl);
     segmentZKMetadata.setCrypterName(crypter);
-    if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata)) {
+    if (!ZKMetadataProvider
+        .setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata, expectedVersion)) {
       throw new RuntimeException(
           "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
     }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index fee669e..335d8f4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -51,6 +51,7 @@
   private static final String TEST_TABLE_NAME = "validationTable";
   private static final String OFFLINE_TEST_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
   private static final String TEST_SEGMENT_NAME = "testSegment";
+  private static final int EXPECTED_VERSION = -1;
 
   private TableConfig _offlineTableConfig;
 
@@ -88,7 +89,7 @@
     }, 30_000L, "Failed to find the segment in the ExternalView");
     Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     ControllerTestUtils.getHelixResourceManager()
-        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, "downloadUrl", null);
+        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
 
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME, TEST_SEGMENT_NAME);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 10966e7..7f3652a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,6 +24,7 @@
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -37,13 +38,21 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
+import org.apache.http.Header;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -78,6 +87,7 @@
   private static final int NUM_SERVERS = 1;
   private static final int NUM_SEGMENTS = 12;
   private static final long ONE_HOUR_IN_MS = TimeUnit.HOURS.toMillis(1);
+  private static final String SEGMENT_UPLOAD_TEST_TABLE = "segmentUploadTestTable";
 
   // For table config refresh test, make an expensive query to ensure the query won't finish in 5ms
   private static final String TEST_TIMEOUT_QUERY =
@@ -324,6 +334,67 @@
     }
   }
 
+  @Test
+  public void testUploadSegmentRefreshOnly()
+      throws Exception {
+    TableConfig segmentUploadTestTableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE).setSchemaName(getSchemaName())
+            .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+            .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+            .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+            .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+            .setSegmentVersion(getSegmentVersion())
+            .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+            .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+            .setNullHandlingEnabled(getNullHandlingEnabled()).build();
+    addTableConfig(segmentUploadTestTableConfig);
+    String offlineTableName = segmentUploadTestTableConfig.getTableName();
+    File[] segmentTarFiles = _tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+    List<Header> headers = new ArrayList<>();
+    headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true"));
+    List<NameValuePair> parameters = new ArrayList<>();
+    NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+        TableNameBuilder.extractRawTableName(offlineTableName));
+    parameters.add(tableNameParameter);
+
+    URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      // Refresh non-existing segment
+      File segmentTarFile = segmentTarFiles[0];
+      try {
+        fileUploadDownloadClient
+            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters,
+                FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+        fail();
+      } catch (HttpErrorStatusException e) {
+        assertEquals(e.getStatusCode(), HttpStatus.SC_GONE);
+        assertTrue(_helixResourceManager.getSegmentsZKMetadata(SEGMENT_UPLOAD_TEST_TABLE).isEmpty());
+      }
+
+      // Upload segment
+      SimpleHttpResponse response = fileUploadDownloadClient
+          .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, null, parameters,
+              FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+      assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+      System.out.println(response.getResponse());
+      List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+      assertEquals(segmentsZKMetadata.size(), 1);
+
+      // Refresh existing segment
+      response = fileUploadDownloadClient
+          .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters,
+              FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+      assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+      segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+      assertEquals(segmentsZKMetadata.size(), 1);
+      assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), Long.MIN_VALUE);
+    }
+    dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
+  }
+
   @Test(dependsOnMethods = "testRangeIndexTriggering")
   public void testInvertedIndexTriggering()
       throws Exception {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index db329bd..646cc40 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -133,6 +133,9 @@
       // the newer segment won't get override
       Header ifMatchHeader = new BasicHeader(HttpHeaders.IF_MATCH, originalSegmentCrc);
 
+      // Only upload segment if it exists
+      Header refreshOnlyHeader = new BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true");
+
       // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
       // NOTE: even segment is not changed, still need to upload the segment to update the segment ZK metadata so that
       // segment will not be submitted again
@@ -144,6 +147,7 @@
 
       List<Header> httpHeaders = new ArrayList<>();
       httpHeaders.add(ifMatchHeader);
+      httpHeaders.add(refreshOnlyHeader);
       httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
       httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
 
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
index d0d9d12..5fc7c64 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
@@ -21,10 +21,13 @@
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,8 +97,16 @@
       String columnsToConvertConfig = taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
 
       // Generate tasks
+      List<SegmentZKMetadata> offlineSegmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+      SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(offlineTableName);
+      Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
+      for (SegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentsZKMetadata) {
+        preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
+      }
+      SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
+
       int tableNumTasks = 0;
-      for (SegmentZKMetadata segmentZKMetadata : _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName)) {
+      for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
         // Generate up to tableMaxNumTasks tasks each time for each table
         if (tableNumTasks == tableMaxNumTasks) {
           break;
@@ -107,6 +118,12 @@
           continue;
         }
 
+        // Skip segments based on lineage: for COMPLETED lineage, segments in `segmentsFrom` will be removed by
+        // retention manager, for IN_PROGRESS lineage, segments in `segmentsTo` are uploaded yet
+        if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
+          continue;
+        }
+
         // Only submit segments that have not been converted
         Map<String, String> customMap = segmentZKMetadata.getCustomMap();
         if (customMap == null || !customMap.containsKey(
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index a1f559e..252505b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -127,7 +127,7 @@
       for (SegmentZKMetadata segment : allSegments) {
         preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
       }
-      SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(preSelectedSegmentsBasedOnLineage, segmentLineage);
+      SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
 
       List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
       for (SegmentZKMetadata segment : allSegments) {