Update error messages when supervisor's checkpoint state is invalid (#16208)

* Update error message when topic messages.

Suggest resetting the supervisor when the topic changes instead of changing
the supervisor name which is actually making a new supervisor.

* Update server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Cleanup

* Remove log and include oldCommitMetadataFromDb

* Fix test

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6f8e827..8473547 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -151,7 +152,12 @@
     );
 
     Assert.assertEquals(
-        SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."),
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end"
+                + " state[null]. Try resetting the supervisor."
+            ).toString()
+        ),
         result
     );
   }
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e2addcc..d364299 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@
 import com.google.common.io.BaseEncoding;
 import com.google.inject.Inject;
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -445,41 +446,33 @@
 
     try {
       return connector.retryTransaction(
-          new TransactionCallback<SegmentPublishResult>()
-          {
-            @Override
-            public SegmentPublishResult inTransaction(
-                final Handle handle,
-                final TransactionStatus transactionStatus
-            ) throws Exception
-            {
-              // Set definitelyNotUpdated back to false upon retrying.
-              definitelyNotUpdated.set(false);
+          (handle, transactionStatus) -> {
+            // Set definitelyNotUpdated back to false upon retrying.
+            definitelyNotUpdated.set(false);
 
-              if (startMetadata != null) {
-                final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
-                    handle,
-                    dataSource,
-                    startMetadata,
-                    endMetadata
-                );
+            if (startMetadata != null) {
+              final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
+                  handle,
+                  dataSource,
+                  startMetadata,
+                  endMetadata
+              );
 
-                if (result.isFailed()) {
-                  // Metadata was definitely not updated.
-                  transactionStatus.setRollbackOnly();
-                  definitelyNotUpdated.set(true);
+              if (result.isFailed()) {
+                // Metadata was definitely not updated.
+                transactionStatus.setRollbackOnly();
+                definitelyNotUpdated.set(true);
 
-                  if (result.canRetry()) {
-                    throw new RetryTransactionException(result.getErrorMsg());
-                  } else {
-                    throw new RuntimeException(result.getErrorMsg());
-                  }
+                if (result.canRetry()) {
+                  throw new RetryTransactionException(result.getErrorMsg());
+                } else {
+                  throw InvalidInput.exception(result.getErrorMsg());
                 }
               }
-
-              final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
-              return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
             }
+
+            final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
+            return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
           },
           3,
           getSqlMetadataMaxRetry()
@@ -2395,17 +2388,19 @@
     }
 
     final boolean startMetadataMatchesExisting;
-    int startMetadataGreaterThanExisting = 0;
+    boolean startMetadataGreaterThanExisting = false;
 
     if (oldCommitMetadataFromDb == null) {
       startMetadataMatchesExisting = startMetadata.isValidStart();
-      startMetadataGreaterThanExisting = 1;
+      startMetadataGreaterThanExisting = true;
     } else {
       // Checking against the last committed metadata.
-      // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1,
-      // 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time.
+      // If the new start sequence number is greater than the end sequence number of the last commit,
+      // compareTo() will return 1 and 0 in all other cases. This can happen if multiple tasks are publishing the
+      // sequence around the same time.
       if (startMetadata instanceof Comparable) {
-        startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
+        startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata())
+                                               .compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
       }
 
       // Converting the last one into start metadata for checking since only the same type of metadata can be matched.
@@ -2415,25 +2410,20 @@
       startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
     }
 
-    if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) {
-      // Offset stored in StartMetadata is Greater than the last commited metadata,
-      // Then retry multiple task might be trying to publish the segment for same partitions.
-      log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].",
-          startMetadata,
-          oldCommitMetadataFromDb);
+    if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
+      // Offsets stored in startMetadata is greater than the last commited metadata.
       return new DataStoreMetadataUpdateResult(true, false,
-          "Failed to update the metadata Store. The new start metadata is ahead of last commited end state."
+          "The new start metadata state[%s] is ahead of the last commited"
+          + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb
       );
     }
 
     if (!startMetadataMatchesExisting) {
       // Not in the desired start state.
-      return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
-          "Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
-          "the supervisor name. Stored state: [%s], Target state: [%s].",
-          oldCommitMetadataFromDb,
-          startMetadata
-      ));
+      return new DataStoreMetadataUpdateResult(true, false,
+          "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.",
+          oldCommitMetadataFromDb, startMetadata
+      );
     }
 
     // Only endOffsets should be stored in metadata store
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 03de72b..7b6fb4d 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -28,6 +28,7 @@
 import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -935,7 +936,14 @@
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), result1);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+                + " end state[null]. Try resetting the supervisor."
+            ).toString()),
+        result1
+    );
 
     // Should only be tried once.
     Assert.assertEquals(1, metadataUpdateCounter.get());
@@ -956,10 +964,15 @@
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
-        "happen if you update input topic in a spec without changing the supervisor name. " +
-        "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
-        "Target state: [ObjectMetadata{theObject=null}]."), result2);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]"
+                + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor."
+            ).toString()
+        ),
+        result2
+    );
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());
@@ -1026,10 +1039,14 @@
         new ObjectMetadata(ImmutableMap.of("foo", "qux")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
-        "happen if you update input topic in a spec without changing the supervisor name. " +
-        "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
-        "Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
+    Assert.assertEquals(
+        SegmentPublishResult.fail(
+            InvalidInput.exception(
+                "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and "
+                + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor."
+            ).toString()),
+        result2
+    );
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());