Update error messages when supervisor's checkpoint state is invalid (#16208)
* Update error message when topic messages.
Suggest resetting the supervisor when the topic changes instead of changing
the supervisor name which is actually making a new supervisor.
* Update server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
* Cleanup
* Remove log and include oldCommitMetadataFromDb
* Fix test
---------
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6f8e827..8473547 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -151,7 +152,12 @@
);
Assert.assertEquals(
- SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."),
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end"
+ + " state[null]. Try resetting the supervisor."
+ ).toString()
+ ),
result
);
}
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e2addcc..d364299 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -445,41 +446,33 @@
try {
return connector.retryTransaction(
- new TransactionCallback<SegmentPublishResult>()
- {
- @Override
- public SegmentPublishResult inTransaction(
- final Handle handle,
- final TransactionStatus transactionStatus
- ) throws Exception
- {
- // Set definitelyNotUpdated back to false upon retrying.
- definitelyNotUpdated.set(false);
+ (handle, transactionStatus) -> {
+ // Set definitelyNotUpdated back to false upon retrying.
+ definitelyNotUpdated.set(false);
- if (startMetadata != null) {
- final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
- handle,
- dataSource,
- startMetadata,
- endMetadata
- );
+ if (startMetadata != null) {
+ final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
+ handle,
+ dataSource,
+ startMetadata,
+ endMetadata
+ );
- if (result.isFailed()) {
- // Metadata was definitely not updated.
- transactionStatus.setRollbackOnly();
- definitelyNotUpdated.set(true);
+ if (result.isFailed()) {
+ // Metadata was definitely not updated.
+ transactionStatus.setRollbackOnly();
+ definitelyNotUpdated.set(true);
- if (result.canRetry()) {
- throw new RetryTransactionException(result.getErrorMsg());
- } else {
- throw new RuntimeException(result.getErrorMsg());
- }
+ if (result.canRetry()) {
+ throw new RetryTransactionException(result.getErrorMsg());
+ } else {
+ throw InvalidInput.exception(result.getErrorMsg());
}
}
-
- final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
- return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
+
+ final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
+ return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
},
3,
getSqlMetadataMaxRetry()
@@ -2395,17 +2388,19 @@
}
final boolean startMetadataMatchesExisting;
- int startMetadataGreaterThanExisting = 0;
+ boolean startMetadataGreaterThanExisting = false;
if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
- startMetadataGreaterThanExisting = 1;
+ startMetadataGreaterThanExisting = true;
} else {
// Checking against the last committed metadata.
- // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1,
- // 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time.
+ // If the new start sequence number is greater than the end sequence number of the last commit,
+ // compareTo() will return 1 and 0 in all other cases. This can happen if multiple tasks are publishing the
+ // sequence around the same time.
if (startMetadata instanceof Comparable) {
- startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
+ startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata())
+ .compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
}
// Converting the last one into start metadata for checking since only the same type of metadata can be matched.
@@ -2415,25 +2410,20 @@
startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}
- if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) {
- // Offset stored in StartMetadata is Greater than the last commited metadata,
- // Then retry multiple task might be trying to publish the segment for same partitions.
- log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].",
- startMetadata,
- oldCommitMetadataFromDb);
+ if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
+ // Offsets stored in startMetadata is greater than the last commited metadata.
return new DataStoreMetadataUpdateResult(true, false,
- "Failed to update the metadata Store. The new start metadata is ahead of last commited end state."
+ "The new start metadata state[%s] is ahead of the last commited"
+ + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb
);
}
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
- return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
- "Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
- "the supervisor name. Stored state: [%s], Target state: [%s].",
- oldCommitMetadataFromDb,
- startMetadata
- ));
+ return new DataStoreMetadataUpdateResult(true, false,
+ "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.",
+ oldCommitMetadataFromDb, startMetadata
+ );
}
// Only endOffsets should be stored in metadata store
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 03de72b..7b6fb4d 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -28,6 +28,7 @@
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -935,7 +936,14 @@
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), result1);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+ + " end state[null]. Try resetting the supervisor."
+ ).toString()),
+ result1
+ );
// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
@@ -956,10 +964,15 @@
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
- "happen if you update input topic in a spec without changing the supervisor name. " +
- "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
- "Target state: [ObjectMetadata{theObject=null}]."), result2);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]"
+ + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor."
+ ).toString()
+ ),
+ result2
+ );
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@@ -1026,10 +1039,14 @@
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
- "happen if you update input topic in a spec without changing the supervisor name. " +
- "Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
- "Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
+ Assert.assertEquals(
+ SegmentPublishResult.fail(
+ InvalidInput.exception(
+ "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and "
+ + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor."
+ ).toString()),
+ result2
+ );
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());