SLING-9650 - Move non shared classes out of shared
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index a593365..b2962b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -45,8 +45,6 @@
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
index 77130e3..e6db4e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.ModifiableValueMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
index 743e9a4..b2bca6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
index 51d5466..0b475fb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index ae0c964..83a7dcf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -47,12 +47,10 @@
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.shared.AgentState;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
 import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
index eb0510c..dc38234 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
deleted file mode 100644
index 9cae44e..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ParametersAreNonnullByDefault
-public class LimitPoller {
-    // Longer timeout for first message as it includes connecting to the journal
-    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30);
-
-    private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
-    
-    private final long minOffset;
-    private final long maxMessages;
-    private final Closeable headPoller;
-    private final Queue<FullMessage<PackageMessage>> messages;
-    private final Semaphore nextMessage;
-    
-    public LimitPoller(MessagingProvider messagingProvider,
-                          String packageTopic,
-                          long minOffset,
-                          long maxMessages) {
-        this.minOffset = minOffset;
-        this.maxMessages = maxMessages;
-        this.messages = new ConcurrentLinkedQueue<>();
-        this.nextMessage = new Semaphore(0);
-        String assign = messagingProvider.assignTo(minOffset);
-        log.info("Fetching {} messages starting from {}", maxMessages, minOffset);
-        headPoller = messagingProvider.createPoller(
-                packageTopic, Reset.earliest, assign,
-                create(PackageMessage.class, this::handlePackage)
-                );
-    }
-
-    public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
-        try {
-            boolean timeout = nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
-            while (!timeout && this.messages.size() < maxMessages) {
-                timeout = !nextMessage.tryAcquire(timeOut.toMillis(), TimeUnit.MILLISECONDS);
-            }
-            ArrayList<FullMessage<PackageMessage>> result = new ArrayList<>(messages);
-            log.info("Fetched {} messages", result.size());
-            return result;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(headPoller);
-        }
-    }
-
-    private void handlePackage(MessageInfo info, PackageMessage message) {
-        long offset = info.getOffset();
-        log.debug("Reading offset {}", offset);
-        if (this.messages.size() < maxMessages && info.getOffset() >= minOffset) {
-            messages.add(new FullMessage<>(info, message));
-        }
-        nextMessage.release();
-    }
-
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
index 41a99b7..286147b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
index f1b5bf7..48bca57 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
index 1abf03d..3bc1919 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index e034365..66dac34 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -68,6 +68,7 @@
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
+import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
@@ -76,7 +77,6 @@
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
deleted file mode 100644
index 9f3dce5..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.samePropertyValuesAs;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-public class LimitPollerTest {
-
-    private static final int MIN_OFFSET = 10;
-    private static final int MAX_MESSAGES = 2;
-    private static final String TOPIC = "topic";
-    private static final Duration TIMEOUT = Duration.ofMillis(100);
-    
-    @Mock
-    private MessagingProvider clientProvider;
-    
-    @Captor
-    private ArgumentCaptor<HandlerAdapter<org.apache.sling.distribution.journal.messages.PackageMessage>> handlerCaptor;
-    
-    @Mock
-    private Closeable poller;
-    
-    private MessageHandler<PackageMessage> handler;
-    
-    @Before
-    public void before() {
-        MockitoAnnotations.initMocks(this);
-        when(clientProvider.assignTo(MIN_OFFSET))
-                .thenReturn("0:" + MIN_OFFSET);
-        when(clientProvider.createPoller(
-                Mockito.eq(TOPIC), 
-                Mockito.eq(Reset.earliest),
-                Mockito.eq("0:" + MIN_OFFSET),
-                handlerCaptor.capture()))
-        .thenReturn(poller);
-    }
-
-    @After
-    public void after() throws IOException {
-        verify(poller).close();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void test() throws Exception {
-        LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
-        handler = handlerCaptor.getValue().getHandler();
-        FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
-        FullMessage<PackageMessage> message2 = simulateMessage(MIN_OFFSET + 1);
-        simulateMessage(MIN_OFFSET + 2);
-        List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
-        assertThat(actualMessages, contains(samePropertyValuesAs(message1), samePropertyValuesAs(message2)));
-    }
-    
-    @Test
-    public void testTimeout() throws Exception {
-        LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
-        handler = handlerCaptor.getValue().getHandler();
-        FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
-        List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
-        assertThat(actualMessages, contains(samePropertyValuesAs(message1)));
-    }
-    
-    private FullMessage<PackageMessage> simulateMessage(int offset) {
-        FullMessage<PackageMessage> message = createMessage(offset);
-        handler.handle(message.getInfo(), message.getMessage());
-        return message;
-    }
-
-    private FullMessage<PackageMessage> createMessage(int offset) {
-        MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
-        PackageMessage message = PackageMessage.builder()
-                .pubAgentName("agent1")
-                .pubSlingId("pub1SlingId")
-                .pkgId("package-" + offset)
-                .reqType(ReqType.ADD)
-                .pkgType("journal")
-                .paths(Arrays.asList("path"))
-                .build();
-        return new FullMessage<>(info, message);
-    }
-
-}