diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.systemready</artifactId>
+            <version>0.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.converter</artifactId>
             <version>1.0.0</version>
             <scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f5ac277..52b6a7a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -43,6 +43,7 @@
 import javax.annotation.ParametersAreNonnullByDefault;
 import javax.management.NotCompliantMBeanException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.impl.shared.AgentState;
@@ -136,6 +137,8 @@
 
     private JMXRegistration reg;
 
+    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
+
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
         REQ_TYPES.put(ADD,    this::sendAndWait);
@@ -146,6 +149,7 @@
     @Activate
     public void activate(PublisherConfiguration config, BundleContext context) {
         requireNonNull(factory);
+        requireNonNull(distributionMetricsService);
         pubAgentName = requireNonNull(config.name());
 
         queuedTimeout = config.queuedTimeout();
@@ -167,6 +171,11 @@
         
         String msg = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
+        subscriberCountGauge = distributionMetricsService.createGauge(
+                DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
+                "Current number of publish subscribers",
+                () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
+        );
         log.info(msg);
     }
 
@@ -176,6 +185,7 @@
         componentReg.unregister();
         String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
+        IOUtils.closeQuietly(subscriberCountGauge);
         log.info(msg);
     }
     
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 23c09ff..16504e9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -46,7 +46,7 @@
 
     public static final String BASE_COMPONENT = "distribution.journal";
 
-    private static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
+    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
 
     public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
     
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index a121693..ffc27bb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -24,29 +24,25 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 class Announcer implements Runnable, Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(Announcer.class);
 
-    private final String topicName;
+    private final BookKeeper bookKeeper;
 
-    private final LocalStore offsetStore;
-
-    private final MessageSender<DiscoveryMessage> sender;
+    private final Consumer<DiscoveryMessage> sender;
 
     private final String subSlingId;
 
@@ -54,8 +50,6 @@
 
     private final Set<String> pubAgentNames;
 
-    private final PackageRetries packageRetries;
-
     private final boolean editable;
 
     private final int maxRetries;
@@ -64,21 +58,17 @@
 
     public Announcer(String subSlingId,
                      String subAgentName,
-                     String topicName,
                      Set<String> pubAgentNames,
-                     MessageSender<DiscoveryMessage> disSender,
-                     LocalStore offsetStore,
-                     PackageRetries packageRetries,
+                     Consumer<DiscoveryMessage> disSender,
+                     BookKeeper bookKeeper,
                      int maxRetries,
                      boolean editable,
                      int announceDelay) {
         this.subSlingId = Objects.requireNonNull(subSlingId);
         this.subAgentName = Objects.requireNonNull(subAgentName);
-        this.topicName = Objects.requireNonNull(topicName);
         this.pubAgentNames = Objects.requireNonNull(pubAgentNames);
         this.sender = Objects.requireNonNull(disSender);
-        this.offsetStore = Objects.requireNonNull(offsetStore);
-        this.packageRetries = Objects.requireNonNull(packageRetries);
+        this.bookKeeper = Objects.requireNonNull(bookKeeper);
         this.maxRetries = maxRetries;
         this.editable = editable;
         executor = Executors.newSingleThreadScheduledExecutor();
@@ -90,7 +80,7 @@
         LOG.debug("Sending discovery message for agent {}", subAgentName);
         try {
 
-            long offset = offsetStore.load("offset", -1L);
+            long offset = bookKeeper.loadOffset();
 
             SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
                     .setEditable(editable)
@@ -102,18 +92,18 @@
                     .setSubAgentName(subAgentName)
                     .setSubscriberConfiguration(subscriberConfiguration);
             for (String pubAgentName : pubAgentNames) {
-                int retries = packageRetries.get(pubAgentName);
-                disMsgBuilder.addSubscriberState(createOffset(pubAgentName, offset, retries));
+                disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
             }
 
-            sender.send(topicName, disMsgBuilder.build());
+            sender.accept(disMsgBuilder.build());
         } catch (Throwable e) {
             String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
             LOG.info(msg, e);
         }
     }
 
-    private SubscriberState createOffset(String pubAgentName, long offset, int retries) {
+    private SubscriberState subscriberState(String pubAgentName, long offset) {
+        int retries = bookKeeper.getRetries(pubAgentName);
         return Messages.SubscriberState.newBuilder()
                 .setPubAgentName(pubAgentName)
                 .setRetries(retries)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
new file mode 100644
index 0000000..e8136c2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Keeps track of offset and processed status and manages 
+ * coordinates the import/retry handling.
+ * 
+ * The offset store is identified by the agentName only.
+ *
+ * With non clustered publish instances deployment, each
+ * instance stores the offset in its own node store, thus
+ * avoiding mix ups. Moreover, when cloning an instance
+ * from a node store, the cloned instance will implicitly
+ * recover the offsets and start from the last processed
+ * offset.
+ *
+ * With clustered publish instances deployment, only one
+ * Subscriber agent must run on the cluster in order to
+ * avoid mix ups.
+ *
+ * The clustered and non clustered publish instances use
+ * cases can be supported by only running the Subscriber
+ * agent on the leader instance.
+ */
+public class BookKeeper implements Closeable {
+    private static final String SUBSERVICE_IMPORTER = "importer";
+    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
+    private static final int RETRY_SEND_DELAY = 1000;
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private final ResourceResolverFactory resolverFactory;
+    private final DistributionMetricsService distributionMetricsService;
+    private final PackageHandler packageHandler;
+    private final EventAdmin eventAdmin;
+    private final Consumer<PackageStatusMessage> sender;
+    private final boolean editable;
+    private final int maxRetries;
+    private final boolean errorQueueEnabled;
+
+    private final PackageRetries packageRetries = new PackageRetries();
+    private final LocalStore statusStore;
+    private final LocalStore processedOffsets;
+    private final String subAgentName;
+    private final String subSlingId;
+    private GaugeService<Integer> retriesGauge;
+
+    public BookKeeper(ResourceResolverFactory resolverFactory, 
+            DistributionMetricsService distributionMetricsService,
+            PackageHandler packageHandler,
+            EventAdmin eventAdmin,
+            Consumer<PackageStatusMessage> sender,
+            String subAgentName,
+            String subSlingId,
+            boolean editable, 
+            int maxRetries) { 
+        this.packageHandler = packageHandler;
+        this.eventAdmin = eventAdmin;
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        this.resolverFactory = resolverFactory;
+        this.distributionMetricsService = distributionMetricsService;
+        this.sender = sender;
+        this.subAgentName = subAgentName;
+        this.subSlingId = subSlingId;
+        this.editable = editable;
+        this.maxRetries = maxRetries;
+        // Error queues are enabled when the number
+        // of retry attempts is limited ; disabled otherwise
+        this.errorQueueEnabled = (maxRetries >= 0);
+        this.statusStore = new LocalStore(resolverFactory, "statuses", subAgentName);
+        this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
+    }
+    
+    /**
+     * We aim at processing the packages exactly once. Processing the packages
+     * exactly once is possible with the following conditions
+     *
+     * I. The package importer is configured to disable auto-committing changes.
+     *
+     * II. A single commit aggregates three content updates
+     *
+     * C1. install the package 
+     * C2. store the processing status 
+     * C3. store the offset processed
+     *
+     * Some package importers require auto-saving or issue partial commits before
+     * failing. For those packages importers, we aim at processing packages at least
+     * once, thanks to the order in which the content updates are applied.
+     */
+    public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+        log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
+                pkgMsg.getReqType(), offset));
+        addPackageMDC(pkgMsg);
+        try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+                ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
+            packageHandler.apply(importerResolver, pkgMsg);
+            if (editable) {
+                storeStatus(importerResolver, new PackageStatus(IMPORTED, offset, pkgMsg.getPubAgentName()));
+            }
+            storeOffset(importerResolver, offset);
+            importerResolver.commit();
+            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+            packageRetries.clear(pkgMsg.getPubAgentName());
+            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+            eventAdmin.postEvent(event);
+        } catch (LoginException | IOException | RuntimeException e) {
+            failure(pkgMsg, offset, e);
+        } finally {
+            MDC.clear();
+        }
+    }
+    
+    private void addPackageMDC(PackageMessage pkgMsg) {
+        MDC.put("module", "distribution");
+        MDC.put("package-id", pkgMsg.getPkgId());
+        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
+        MDC.put("paths", paths);
+        MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
+        String pubAgentName = pkgMsg.getPubAgentName();
+        MDC.put("pub-agent-name", pubAgentName);
+        MDC.put("distribution-message-type", pkgMsg.getReqType().name());
+        MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
+        MDC.put("sub-sling-id", subSlingId);
+        MDC.put("sub-agent-name", subAgentName);
+    }
+    
+    /**
+     * Should be called on a exception while importing a package.
+     * 
+     * When we use an error queue and the max retries is reached the package is removed.
+     * In all other cases a DistributionException is thrown that signals that we should retry the
+     * package.
+     * 
+     * @param pkgMsg
+     * @param offset
+     * @param e
+     * @throws DistributionException if the package should be retried
+     */
+    private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
+        distributionMetricsService.getFailedPackageImports().mark();
+
+        String pubAgentName = pkgMsg.getPubAgentName();
+        int retries = packageRetries.get(pubAgentName);
+        if (errorQueueEnabled && retries >= maxRetries) {
+            log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+            removeFailedPackage(pkgMsg, offset);
+        } else {
+            packageRetries.increase(pubAgentName);
+            String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
+            throw new DistributionException(msg, e);
+        }
+    }
+
+    public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+        log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            if (editable) {
+                storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
+            }
+            storeOffset(resolver, offset);
+            resolver.commit();
+        }
+        packageRetries.clear(pkgMsg.getPubAgentName());
+        context.stop();
+    }
+
+    public void sendStoredStatus() throws InterruptedException, IOException {
+        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+            PackageStatus status = new PackageStatus(statusStore.load());
+            boolean sent = status.sent;
+            for (int retry = 0 ; !sent ; retry++) {
+                try {
+                    sendStatusMessage(status, retry);
+                    markStatusSent();
+                    sent = true;
+                } catch (Exception e) {
+                    log.warn("Cannot send status (retry {})", retry, e);
+                    Thread.sleep(RETRY_SEND_DELAY);
+                }
+            }
+        }
+    }
+    
+    private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+        PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setPubAgentName(status.pubAgentName)
+                .setOffset(status.offset)
+                .setStatus(status.status)
+                .build();
+        sender.accept(pkgStatMsg);
+        log.info("Sent status message {}",  pkgStatMsg);
+    }
+
+    public void markStatusSent() {
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            statusStore.store(resolver, "sent", true);
+            resolver.commit();
+        } catch (Exception e) {
+            log.warn("Failed to mark status as sent", e);
+        }
+    }
+    
+    public long loadOffset() {
+        return  processedOffsets.load("offset", -1L);
+    }
+
+    public int getRetries(String pubAgentName) {
+        return packageRetries.get(pubAgentName);
+    }
+
+    public PackageRetries getPackageRetries() {
+        return packageRetries;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(retriesGauge);
+    }
+    
+    private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
+        log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
+            storeOffset(resolver, offset);
+            resolver.commit();
+        } catch (Exception e) {
+            throw new DistributionException("Error removing failed package", e);
+        }
+        context.stop();
+    }
+
+    private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
+        Map<String, Object> statusMap = packageStatus.asMap();
+        statusStore.store(resolver, statusMap);
+        log.info("Stored status {}", statusMap);
+    }
+
+    private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
+        processedOffsets.store(resolver, "offset", offset);
+    }
+
+    private ResourceResolver getServiceResolver(String subService) throws LoginException {
+        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, SUBSERVICE_BOOKKEEPER));
+    }
+
+    class PackageStatus {
+        final Status status;
+        final Long offset;
+        final String pubAgentName;
+        final Boolean sent;
+
+        PackageStatus(Status status, long offset, String pubAgentName) {
+            this.status = status;
+            this.offset = offset;
+            this.pubAgentName = pubAgentName;
+            this.sent = false;
+        }
+        
+        PackageStatus(ValueMap statusMap) {
+            Integer statusNum = statusMap.get("statusNumber", Integer.class);
+            this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
+            this.offset = statusMap.get("offset", Long.class);
+            this.pubAgentName = statusMap.get("pubAgentName", String.class);
+            this.sent = statusMap.get("sent", true);
+        }
+
+        Map<String, Object> asMap() {
+            Map<String, Object> s = new HashMap<>();
+            s.put("pubAgentName", pubAgentName);
+            s.put("statusNumber", status.getNumber());
+            s.put("offset", offset);
+            s.put("sent", sent);
+            return s;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
new file mode 100644
index 0000000..10186a7
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommandPoller implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+
+    private final String subSlingId;
+    private final String subAgentName;
+    private final boolean editable;
+    private final Closeable commandPoller;
+    private final AtomicLong clearOffset = new AtomicLong(-1);
+
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
+        this.subSlingId = subSlingId;
+        this.subAgentName = subAgentName;
+        this.editable = editable;
+        if (editable) {
+
+            /*
+             * We currently only support commands requiring editable mode.
+             * As an optimisation, we don't register a poller for non
+             * editable subscribers.
+             *
+             * When supporting commands independent from editable mode,
+             * this optimisation will be removed.
+             */
+
+            commandPoller = messagingProvider.createPoller(
+                    topics.getCommandTopic(),
+                    Reset.earliest,
+                    create(CommandMessage.class, this::handleCommandMessage));
+        } else {
+            commandPoller = null;
+        }
+    }
+    
+    public boolean isCleared(long offset) {
+        return offset <= clearOffset.longValue();
+    }
+
+    private void handleCommandMessage(MessageInfo info, CommandMessage message) {
+        if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
+            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+            return;
+        }
+
+        if (message.hasClearCommand()) {
+            handleClearCommand(message.getClearCommand().getOffset());
+        } else {
+            LOG.warn("Unsupported command {}", message);
+        }
+    }
+
+    private void handleClearCommand(long offset) {
+        if (editable) {
+            updateClearOffsetIfLarger(offset);
+            LOG.info("Handled clear command for offset {}", offset);
+        } else {
+            LOG.warn("Unexpected ClearCommand for non editable subscriber");
+        }
+
+    }
+
+    private long updateClearOffsetIfLarger(long offset) {
+        return clearOffset.accumulateAndGet(offset, Math::max);
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(commandPoller);
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index e104a19..c77d588 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -18,27 +18,19 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
 import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
 import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonMap;
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toSet;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
 import java.io.Closeable;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -46,43 +38,40 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.serviceusermapping.ServiceUserMapped;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -90,44 +79,28 @@
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.Reset;
+import com.google.protobuf.GeneratedMessage;
 
 /**
- * A Subscriber SCD agent which consumes messages produced by a {@code DistributionPublisher} agent.
+ * A Subscriber SCD agent which consumes messages produced by a
+ * {@code DistributionPublisher} agent.
  */
-@Component(
-        service = {},
-        immediate = true,
-        property = {"announceDelay=10000"},
-        configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = {}, immediate = true, property = {
+        "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
 public class DistributionSubscriber implements DistributionAgent {
     private static final int PRECONDITION_TIMEOUT = 60;
-    private static final int RETRY_SEND_DELAY = 1000;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
-    
+
     private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
 
     @Reference(name = "packageBuilder")
@@ -147,7 +120,7 @@
 
     @Reference
     private EventAdmin eventAdmin;
-    
+
     @Reference
     private JournalAvailable journalAvailable;
 
@@ -156,61 +129,40 @@
 
     @Reference
     private DistributionMetricsService distributionMetricsService;
-    
-    @Reference
-    private ServiceUserMapped mappedUser;
 
     @Reference
     private Packaging packaging;
     
+    @Reference
+    private SubscriberIdle subscriberIdle;
+    
     private ServiceRegistration<DistributionAgent> componentReg;
 
-    private final PackageRetries packageRetries = new PackageRetries();
-
-    private GaugeService<Integer> retriesGauge;
-
     private Closeable packagePoller;
 
-    private Closeable commandPoller;
+    private CommandPoller commandPoller;
 
-    private LocalStore processedOffsets;
+    private BookKeeper bookKeeper;
 
-    private LocalStore processedStatuses;
-
-
-    private final AtomicLong clearOffset = new AtomicLong(-1);
-
-    // Use a bounded internal buffer to allow reading further packages while working on one at a time
+    // Use a bounded internal buffer to allow reading further packages while working
+    // on one at a time
     private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
 
     private Set<String> queueNames = Collections.emptySet();
 
-    private MessageSender<PackageStatusMessage> sender;
-
     private Announcer announcer;
 
     private String subAgentName;
 
-    private String subSlingId;
-
     private String pkgType;
 
-    private int maxRetries;
-
-    private boolean errorQueueEnabled;
-
-    private boolean editable;
-
     private volatile boolean running = true;
 
     private volatile Thread queueProcessor;
-    
-    private ContentPackageExtractor extractor;
-    
+
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
-
-        subSlingId = requireNonNull(slingSettings.getSlingId());
+        String subSlingId = requireNonNull(slingSettings.getSlingId());
         subAgentName = requireNonNull(config.name());
         requireNonNull(config);
         requireNonNull(context);
@@ -223,103 +175,46 @@
         requireNonNull(precondition);
 
         queueNames = getNotEmpty(config.agentNames());
+        int maxRetries = config.maxRetries();
+        boolean editable = config.editable();
 
-        maxRetries = config.maxRetries();
-        // Error queues are enabled when the number
-        // of retry attempts is limited ; disabled otherwise
-        errorQueueEnabled = (maxRetries >= 0);
-
-        editable = config.editable();
-
-        // The offset store is identified by the agentName only.
-        //
-        // With non clustered publish instances deployment, each
-        // instance stores the offset in its own node store, thus
-        // avoiding mix ups. Moreover, when cloning an instance
-        // from a node store, the cloned instance will implicitly
-        // recover the offsets and start from the last processed
-        // offset.
-        //
-        // With clustered publish instances deployment, only one
-        // Subscriber agent must run on the cluster in order to
-        // avoid mix ups.
-        //
-        // The clustered and non clustered publish instances use
-        // cases can be supported by only running the Subscriber
-        // agent on the leader instance.
-        processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
-        long startOffset = processedOffsets.load("offset", -1L) + 1;
-
-        processedStatuses = new LocalStore(resolverFactory, "statuses", subAgentName);
-
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+                sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
+        
+        long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
 
-        packagePoller = messagingProvider.createPoller(
-                topics.getPackageTopic(), 
-                Reset.earliest, 
-                assign,
+        packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
                 create(PackageMessage.class, this::handlePackageMessage));
 
-        if (editable) {
-
-            /*
-             * We currently only support commands requiring editable mode.
-             * As an optimisation, we don't register a poller for non
-             * editable subscribers.
-             *
-             * When supporting commands independent from editable mode,
-             * this optimisation will be removed.
-             */
-
-            commandPoller = messagingProvider.createPoller(
-                    topics.getCommandTopic(),
-                    Reset.earliest,
-                    create(CommandMessage.class, this::handleCommandMessage));
-        }
-
+        commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable);
 
         queueProcessor = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
-        sender = messagingProvider.createSender();
-        
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.name();
-        retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
-
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
-        MessageSender<DiscoveryMessage> disSender = messagingProvider.createSender();
-        announcer = new Announcer(subSlingId,
-                subAgentName,
-                topics.getDiscoveryTopic(),
-                queueNames,
-                disSender,
-                processedOffsets,
-                packageRetries,
-                maxRetries,
-                config.editable(),
-                announceDelay
-                );
+        announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
+                maxRetries, config.editable(), announceDelay);
 
         pkgType = requireNonNull(packageBuilder.getType());
-
-        String msg = format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
-                subAgentName,
-                startOffset,
-                queueNames,
-                pkgType,
-                config.editable(),
-                maxRetries,
-                errorQueueEnabled);
+        boolean errorQueueEnabled = (maxRetries >= 0);
+        String msg = format(
+                "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
+                subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
         LOG.info(msg);
         Dictionary<String, Object> props = createServiceProps(config);
         componentReg = context.registerService(DistributionAgent.class, this, props);
-        extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+    }
+
+    private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
+        MessageSender<T> sender = messagingProvider.createSender();
+        return msg -> sender.send(topic, msg);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
-        return asList(agentNames).stream()
-                .filter(StringUtils::isNotBlank)
-                .collect(toSet());
+        return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
     }
 
     private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
@@ -338,8 +233,8 @@
 
     @Deactivate
     public void deactivate() {
-        IOUtils.closeQuietly(retriesGauge);
         IOUtils.closeQuietly(announcer);
+        IOUtils.closeQuietly(bookKeeper);
         componentReg.unregister();
         IOUtils.closeQuietly(packagePoller);
         IOUtils.closeQuietly(commandPoller);
@@ -348,10 +243,9 @@
         if (interrupter != null) {
             interrupter.interrupt();
         }
-        String msg = String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
-                subAgentName,
-                queueNames,
-                pkgType);
+        String msg = String.format(
+                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
+                subAgentName, queueNames, pkgType);
         LOG.info(msg);
     }
 
@@ -363,9 +257,9 @@
 
     @Override
     public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream()
-                .filter(item -> isIn(queueName, item)).findFirst().orElse(null);
-        return new SubQueue(queueName, head, packageRetries);
+        DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+                .orElse(null);
+        return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
     }
 
     private boolean isIn(String queueName, DistributionQueueItem queueItem) {
@@ -378,7 +272,7 @@
     public DistributionLog getLog() {
         return this::emptyDistributionLog;
     }
-    
+
     private List<String> emptyDistributionLog() {
         return Collections.emptyList();
     }
@@ -391,8 +285,7 @@
 
     @Nonnull
     @Override
-    public DistributionResponse execute(ResourceResolver resourceResolver,
-                                        DistributionRequest request) {
+    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
         return executeUnsupported(request);
     }
 
@@ -424,11 +317,11 @@
     }
 
     private boolean shouldEnqueue(PackageMessage message) {
-        if (! queueNames.contains(message.getPubAgentName())) {
+        if (!queueNames.contains(message.getPubAgentName())) {
             LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
             return false;
         }
-        if (! pkgType.equals(message.getPkgType())) {
+        if (!pkgType.equals(message.getPkgType())) {
             LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
             return false;
         }
@@ -436,25 +329,30 @@
     }
 
     /**
-     * We block here if the buffer is full in order to limit the number of
-	 * binary packages fetched in memory. Note that each queued item contains
-	 * the binary package to be imported.
+     * We block here if the buffer is full in order to limit the number of binary
+     * packages fetched in memory. Note that each queued item contains the binary
+     * package to be imported.
      */
-	private void enqueue(DistributionQueueItem queueItem) throws InterruptedException {
-        while (running) {
-            if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
-                distributionMetricsService.getItemsBufferSize().increment();
-                return;
+    private void enqueue(DistributionQueueItem queueItem) {
+        try {
+            while (running) {
+                if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
+                    distributionMetricsService.getItemsBufferSize().increment();
+                    return;
+                }
             }
+            throw new InterruptedException();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException();
         }
-        throw new InterruptedException();
-	}
+    }
 
     private void processQueue() {
         LOG.info("Started Queue processor");
-        while (! Thread.interrupted()) {
+        while (!Thread.interrupted()) {
             try {
-                processQueueItems();
+                fetchAndProcessQueueItem();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -462,18 +360,25 @@
         LOG.info("Stopped Queue processor");
     }
 
-    private void processQueueItems() throws InterruptedException {
+    private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
             // send status stored in a previous run if exists
-            try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
-                sendStoredStatus();
-            }
+            bookKeeper.sendStoredStatus();
             // block until an item is available
             DistributionQueueItem item = blockingPeekQueueItem();
             // and then process it
+            subscriberIdle.busy();
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
+            } finally {
+                subscriberIdle.idle();
             }
+        } catch (IllegalStateException e) {
+            /**
+             * Precondition timed out. We only log this on info level as it is no error
+             */
+            LOG.info(e.getMessage());
+            Thread.sleep(RETRY_DELAY);
         } catch (InterruptedException e) {
             throw e;
         } catch (Throwable t) {
@@ -496,264 +401,20 @@
 
     private void processQueueItem(DistributionQueueItem queueItem) throws Exception {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
-        boolean skip;
-        try {
-            skip = isCleared(offset) || cannotProcess(offset);
-        } catch (IllegalStateException e) {
-            /**
-             * This will occur when the precondition times out.
-             */
-            LOG.info(e.getMessage());
-            Thread.sleep(RETRY_DELAY);
-            return;
-        }
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
-        String pubAgentName = pkgMsg.getPubAgentName();
+        boolean skip = shouldSkip(offset);
         if (skip) {
-            removePackage(pkgMsg, offset);
+            bookKeeper.removePackage(pkgMsg, offset);
         } else {
             long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
-            importPackage(pkgMsg, offset, createdTime);
+            bookKeeper.importPackage(pkgMsg, offset, createdTime);
         }
-        queueItemProcessed(pubAgentName);
-    }
-
-    private void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
-        LOG.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            if (editable) {
-                storeStatus(resolver, REMOVED, offset, pkgMsg.getPubAgentName());
-            }
-            storeOffset(resolver, offset);
-            resolver.commit();
-            context.stop();
-        }
-    }
-
-    private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws Exception {
-        LOG.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            storeStatus(resolver, REMOVED_FAILED, offset, pkgMsg.getPubAgentName());
-            storeOffset(resolver, offset);
-            resolver.commit();
-            context.stop();
-        }
-    }
-
-    private void importPackage(PackageMessage pkgMsg, long offset, long createdTime)
-            throws Exception {
-        String pubAgentName = pkgMsg.getPubAgentName();
-        LOG.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        addPackageMDC(pkgMsg);
-        Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
-        try (ResourceResolver importerResolver = getServiceResolver("importer")) {
-
-            /*
-             * We aim at processing the packages exactly once.
-             * Processing the packages exactly once is possible
-             * with the following conditions
-             *
-             * I.  The package importer is configured to disable
-             *     auto-committing changes.
-             *
-             * II. A single commit aggregates three content updates
-             *
-             *     C1. install the package
-             *     C2. store the processing status
-             *     C3. store the offset processed
-             *
-             * Some package importers require auto-saving or issue
-             * partial commits before failing.
-             * For those packages importers, we aim at processing
-             * packages at least once, thanks to the order in which
-             * the content updates are applied.
-             */
-
-            installPackage(importerResolver, pkgMsg);
-            if (editable) {
-                storeStatus(importerResolver, IMPORTED, offset, pubAgentName);
-            }
-            storeOffset(importerResolver, offset);
-            importerResolver.commit();
-
-            context.stop();
-            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
-
-            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
-            eventAdmin.postEvent(event);
-        } catch (Throwable e) {
-            distributionMetricsService.getFailedPackageImports().mark();
-            // rethrow fatal exceptions
-            if (e instanceof Error) {
-                throw (Error) e;
-            }
-            int retries = packageRetries.get(pubAgentName);
-            if (errorQueueEnabled && retries >= maxRetries) {
-                LOG.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
-                removeFailedPackage(pkgMsg, offset);
-            } else {
-                packageRetries.increase(pubAgentName);
-                String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
-                throw new DistributionException(msg, e);
-            }
-        } finally {
-            MDC.clear();
-        }
-    }
-
-    private void storeOffset(ResourceResolver importerResolver, long offset)
-            throws PersistenceException {
-        processedOffsets.store(importerResolver, "offset", offset);
-    }
-
-    private void queueItemProcessed(String pubAgentName) {
-        packageRetries.clear(pubAgentName);
         queueItemsBuffer.remove();
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private void addPackageMDC(PackageMessage pkgMsg) {
-        MDC.put("module", "distribution");
-        MDC.put("package-id", pkgMsg.getPkgId());
-        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
-        MDC.put("paths", paths);
-        MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
-        String pubAgentName = pkgMsg.getPubAgentName();
-        MDC.put("pub-agent-name", pubAgentName);
-        MDC.put("distribution-message-type", pkgMsg.getReqType().name());
-        MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
-        MDC.put("sub-sling-id", subSlingId);
-        MDC.put("sub-agent-name", subAgentName);
-    }
-
-
-    private void installPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException, PersistenceException {
-        PackageMessage.ReqType type = pkgMsg.getReqType();
-        switch (type) {
-            case ADD:
-                installAddPackage(resolver, pkgMsg);
-                break;
-            case DELETE:
-                installDeletePackage(resolver, pkgMsg);
-                break;
-            case TEST:
-                break;
-            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
-        }
-    }
-
-    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException {
-        LOG.info("Importing paths " + pkgMsg.getPathsList());
-        InputStream pkgStream = null;
-        try {
-            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
-            packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPathsList());
-        } finally {
-            IOUtils.closeQuietly(pkgStream);
-        }
-
-    }
-
-    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws PersistenceException {
-        LOG.info("Deleting paths " + pkgMsg.getPathsList());
-        for (String path : pkgMsg.getPathsList()) {
-            Resource resource = resolver.getResource(path);
-            if (resource != null) {
-                resolver.delete(resource);
-            }
-        }
-    }
-
-    private void storeStatus(ResourceResolver resolver, Status status, long offset, String pubAgentName) throws PersistenceException {
-        Map<String, Object> s = new HashMap<>();
-        s.put("pubAgentName", pubAgentName);
-        s.put("statusNumber", status.getNumber());
-        s.put("offset", offset);
-        s.put("sent", false);
-        processedStatuses.store(resolver, s);
-        LOG.info("Stored status {}", s);
-    }
-
-    private void sendStoredStatus() throws InterruptedException {
-        ValueMap status = processedStatuses.load();
-        boolean sent = status.get("sent", true);
-        for (int retry = 0 ; !sent ; retry++) {
-            try {
-                sendStatusMessage(status);
-                markStatusSent();
-                sent = true;
-            } catch (Exception e) {
-                LOG.warn("Cannot send status (retry {})", retry, e);
-                Thread.sleep(RETRY_SEND_DELAY);
-            }
-        }
-    }
-
-    private void markStatusSent() {
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            processedStatuses.store(resolver, "sent", true);
-            resolver.commit();
-        } catch (Exception e) {
-            LOG.warn("Failed to mark status as sent", e);
-        }
-    }
-
-    private void sendStatusMessage(ValueMap status) {
-
-        PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setPubAgentName(status.get("pubAgentName", String.class))
-                .setOffset(status.get("offset", Long.class))
-                .setStatus(Status.valueOf(status.get("statusNumber", Integer.class)))
-                .build();
-
-        sender.send(topics.getStatusTopic(), pkgStatMsg);
-        LOG.info("Sent status message {}", status);
-    }
-
-    private void handleCommandMessage(MessageInfo info, CommandMessage message) {
-        if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
-            if (message.hasClearCommand()) {
-                handleClearCommand(message.getClearCommand().getOffset());
-            } else {
-                LOG.warn("Unsupported command {}", message);
-            }
-        } else {
-            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
-        }
-    }
-
-    private boolean isCleared(long offset) {
-        return offset <= clearOffset.longValue();
-    }
-
-    private boolean cannotProcess(long offset) {
-	    return !precondition.canProcess(offset , PRECONDITION_TIMEOUT);
-    }
-
-    private void handleClearCommand(long offset) {
-        if (editable) {
-            // atomically compare and set clearOffset
-            // as the max between the provided offset
-            // and the current clearOffset
-            clearOffset.accumulateAndGet(offset, Math::max);
-            LOG.info("Handled clear command for offset {}", offset);
-        } else {
-            LOG.warn("Unexpected ClearCommand for non editable subscriber");
-        }
-
-    }
-
-    private ResourceResolver getServiceResolver(String subService) throws LoginException {
-        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+    private boolean shouldSkip(long offset) throws IllegalStateException {
+        return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
new file mode 100644
index 0000000..7307fdc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
+    
+    private DistributionPackageBuilder packageBuilder;
+    
+    private ContentPackageExtractor extractor;
+
+    public PackageHandler(DistributionPackageBuilder packageBuilder, ContentPackageExtractor extractor) {
+        this.packageBuilder = packageBuilder;
+        this.extractor = extractor;
+    }
+
+    public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException, PersistenceException {
+        PackageMessage.ReqType type = pkgMsg.getReqType();
+        switch (type) {
+            case ADD:
+                installAddPackage(resolver, pkgMsg);
+                break;
+            case DELETE:
+                installDeletePackage(resolver, pkgMsg);
+                break;
+            case TEST:
+                break;
+            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
+        }
+    }
+
+    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException {
+        LOG.info("Importing paths " + pkgMsg.getPathsList());
+        InputStream pkgStream = null;
+        try {
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+            packageBuilder.installPackage(resolver, pkgStream);
+            extractor.handle(resolver, pkgMsg.getPathsList());
+        } finally {
+            IOUtils.closeQuietly(pkgStream);
+        }
+
+    }
+
+    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws PersistenceException {
+        LOG.info("Deleting paths " + pkgMsg.getPathsList());
+        for (String path : pkgMsg.getPathsList()) {
+            Resource resource = resolver.getResource(path);
+            if (resource != null) {
+                resolver.delete(resource);
+            }
+        }
+    }
+    
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..7dac19c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than 
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+    private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+    private final int idleMillis;
+    private final AtomicBoolean isReady = new AtomicBoolean();
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> schedule;
+    
+    public SubscriberIdle() {
+        this(DEFAULT_IDLE_TIME_MILLIS);
+    }
+
+    public SubscriberIdle(int idleMillis) {
+        this.idleMillis = idleMillis;
+        executor = Executors.newScheduledThreadPool(1);
+    }
+    
+    @Override
+    public String getName() {
+        return "DistributionSubscriber idle";
+    }
+
+    @Override
+    public CheckStatus getStatus() {
+        State state = isReady.get() ? State.GREEN : State.RED; 
+        return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+    }
+    
+    /**
+     * Called when processing of a message starts
+     */
+    public synchronized void busy() {
+        if (schedule != null) {
+            schedule.cancel(false);
+        }
+    }
+
+    /**
+     * Called when processing of a message has finished
+     */
+    public synchronized void idle() {
+        if (!isReady.get()) {
+            busy();
+            schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+    
+    private void ready() {
+        isReady.set(true);
+    }
+
+    @Override
+    public void close() {
+        executor.shutdownNow();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index e9ef03b..05f8a88 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -25,16 +25,13 @@
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.function.Consumer;
 
 import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.MessageSender;
-
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-
 public class AnnouncerTest {
 
     private static final String SUB1_SLING_ID = "sub1sling";
@@ -44,14 +41,13 @@
     @Test
     @SuppressWarnings("unchecked")
     public void testDiscoveryMessage() throws InterruptedException {
-        MessageSender<Messages.DiscoveryMessage> sender = Mockito.mock(MessageSender.class);
-        LocalStore offsetStore = Mockito.mock(LocalStore.class);
-        PackageRetries packageRetries = Mockito.mock(PackageRetries.class);
-        when(offsetStore.load("offset", -1L)).thenReturn(1l);
-        Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, "discoverytopic", Collections.singleton(PUB1_AGENT_NAME), sender, offsetStore, packageRetries, -1, false, 10000);
+        Consumer<Messages.DiscoveryMessage> sender = Mockito.mock(Consumer.class);
+        BookKeeper bookKeeper = Mockito.mock(BookKeeper.class);
+        when(bookKeeper.loadOffset()).thenReturn(1l);
+        Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000);
         Thread.sleep(200);
         ArgumentCaptor<Messages.DiscoveryMessage> msg = forClass(Messages.DiscoveryMessage.class);
-        verify(sender).send(Mockito.eq("discoverytopic"), msg.capture());
+        verify(sender).accept(msg.capture());
         Messages.DiscoveryMessage message = msg.getValue();
         Messages.SubscriberState offset = message.getSubscriberStateList().iterator().next();
         assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID));
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..76ae4bc
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+    private SubscriberIdle idle;
+
+    @Test
+    public void testIdle() throws InterruptedException {
+        idle = new SubscriberIdle(40);
+        assertState("Initial state", State.RED);
+        idle.busy();
+        idle.idle();
+        assertState("State after reset", State.RED);
+        Thread.sleep(30);
+        assertState("State after time below idle limit", State.RED);
+        idle.busy();
+        Thread.sleep(80);
+        idle.idle();
+        assertState("State after long processing", State.RED);
+        Thread.sleep(80);
+        assertState("State after time over idle limit", State.GREEN);
+        idle.busy();
+        assertState("State should not be reset once it reached GREEN", State.GREEN);
+        idle.close();
+    }
+
+    private void assertState(String message, State expectedState) {
+        assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+    }
+    
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@
 
     @Mock
     private Timer.Context timerContext;
+    
+    @Mock
+    private SubscriberIdle subscriberIdle;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@
     
     @Mock
     private ServiceRegistration<DistributionAgent> reg;
-
+    
     private MessageHandler<PackageMessage> packageHandler;
 
 
