SLING-11181: Distinguish transient and permanent package import failure metrics (#105)

Co-authored-by: José Correia <josec@adobe.com>
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 3dd4e54..5e85f60 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -168,8 +168,8 @@
             // Execute the post-processor
             postProcess(pkgMsg);
 
-            packageRetries.clear(pkgMsg.getPubAgentName());
-             
+            clearPackageRetriesOnSuccess(pkgMsg);
+
             Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
             log.info("Imported distribution package {} at offset={}", pkgMsg, offset);
@@ -199,7 +199,7 @@
             storeOffset(resolver, offset);
             resolver.commit();
 
-            packageRetries.clear(pkgMsg.getPubAgentName());
+            clearPackageRetriesOnSuccess(pkgMsg);
 
             Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
@@ -259,6 +259,7 @@
         if (giveUp) {
             log.warn(msg, e);
             removeFailedPackage(pkgMsg, offset);
+            distributionMetricsService.getPermanentImportErrors().increment();
         } else {
             packageRetries.increase(pubAgentName);
             throw new DistributionException(msg, e);
@@ -368,6 +369,20 @@
         return packageRetries;
     }
 
+    /**
+     * This method clears the packageRetries storage for a given package and
+     * emits metrics on the success of the retry.
+     * @param pkgMsg: package distributed
+     */
+    public void clearPackageRetriesOnSuccess(PackageMessage pkgMsg) {
+        String pubAgentName = pkgMsg.getPubAgentName();
+        if (packageRetries.get(pubAgentName) > 0) {
+            distributionMetricsService.getTransientImportErrors().increment();
+        }
+
+        packageRetries.clear(pubAgentName);
+    }
+
     @Override
     public void close() throws IOException {
         IOUtils.closeQuietly(retriesGauge);
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index b118d53..00b6a9b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -103,6 +103,10 @@
 
     private Counter invalidationProcessRequest;
 
+    private Counter transientImportErrors;
+
+    private Counter permanentImportErrors;
+
     private BundleContext context;
 
     @Activate
@@ -133,6 +137,8 @@
         invalidationProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "invalidation_process_duration"));
         invalidationProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_success_count"));
         invalidationProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_request_count"));
+        transientImportErrors = getCounter(getMetricName(SUB_COMPONENT, "transient_import_errors"));
+        permanentImportErrors = getCounter(getMetricName(SUB_COMPONENT, "permanent_import_errors"));
     }
 
     /**
@@ -403,6 +409,10 @@
         return invalidationProcessRequest;
     }
 
+    public Counter getTransientImportErrors() { return transientImportErrors; }
+
+    public Counter getPermanentImportErrors() { return permanentImportErrors; }
+
     public class GaugeService<T> implements Gauge<T>, Closeable {
         
         @SuppressWarnings("rawtypes")
diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 87b33e9..5ea7356 100644
--- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -48,6 +48,7 @@
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import static org.mockito.Matchers.any;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.service.event.EventAdmin;
 
@@ -108,6 +109,13 @@
                 .thenReturn(mock(Timer.class));
         when(distributionMetricsService.getInvalidationProcessSuccess())
                 .thenReturn(mock(Counter.class));
+        when(distributionMetricsService.getTransientImportErrors())
+                .thenReturn(mock(Counter.class));
+        when(distributionMetricsService.getPermanentImportErrors())
+                .thenReturn(mock(Counter.class));
+        when(distributionMetricsService.getPackageStatusCounter(any(String.class)))
+                .thenReturn(mock(Counter.class));
+
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package");
         bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, logSender, bkConfig,
             importPostProcessor, invalidationProcessor);
@@ -131,10 +139,6 @@
 
     @Test
     public void testPackageImport() throws DistributionException {
-        when(distributionMetricsService.getPackageStatusCounter(
-                PackageStatusMessage.Status.IMPORTED.name())
-        ).thenReturn(mock(Counter.class));
-
         try {
             bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, currentTimeMillis());
         } finally {
@@ -144,10 +148,6 @@
 
     @Test
     public void testCacheInvalidation() throws DistributionException {
-        when(distributionMetricsService.getPackageStatusCounter(
-                PackageStatusMessage.Status.IMPORTED.name())
-        ).thenReturn(mock(Counter.class));
-
         try {
             bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), 10);
         } finally {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 0fc92ef..b9153de 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -517,6 +517,10 @@
                 .thenReturn(timer);
         when(distributionMetricsService.getPackageDistributedDuration())
                 .thenReturn(timer);
+        when(distributionMetricsService.getTransientImportErrors())
+                .thenReturn(counter);
+        when(distributionMetricsService.getPermanentImportErrors())
+                .thenReturn(counter);
 
         when(distributionMetricsService.getImportPostProcessDuration())
             .thenReturn(timer);
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 516369c..818e9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -106,6 +106,8 @@
         assertNotNull(metrics.getRemovedPackageDuration());
         assertNotNull(metrics.getSendStoredStatusDuration());
         assertNotNull(metrics.getPackageStatusCounter("mockStatus"));
+        assertNotNull(metrics.getTransientImportErrors());
+        assertNotNull(metrics.getPermanentImportErrors());
     }
     
     @Test