SLING-8908 - Fixing error in merge
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index e8136c2..4518e7b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -206,7 +206,7 @@
}
}
- public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+ public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
@@ -219,6 +219,14 @@
packageRetries.clear(pkgMsg.getPubAgentName());
context.stop();
}
+
+ public void skipPackage(long offset) throws LoginException, PersistenceException {
+ log.info(format("Skipping package at offset %s", offset));
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ storeOffset(resolver, offset);
+ resolver.commit();
+ }
+ }
public void sendStoredStatus() throws InterruptedException, IOException {
try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c77d588..13b59b9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -46,6 +46,8 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
@@ -299,19 +301,13 @@
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
if (shouldEnqueue(message)) {
- try {
- DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
- enqueue(queueItem);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException();
- }
+ DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+ enqueue(queueItem);
} else {
- try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
- storeOffset(resolver, info.getOffset());
- resolver.commit();
- } catch (LoginException | PersistenceException e) {
- LOG.warn("Error storing offset", e);
+ try {
+ bookKeeper.skipPackage(info.getOffset());
+ } catch (PersistenceException | LoginException e) {
+ LOG.info("Error marking message at offset {} as skipped", e);
}
}
}