SLING-9650 - Move non shared classes out of shared
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index a593365..b2962b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -45,8 +45,6 @@
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
index 77130e3..e6db4e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
index 743e9a4..b2bca6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetries.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
index 51d5466..0b475fb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index ae0c964..83a7dcf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -47,12 +47,10 @@
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.shared.AgentState;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
index eb0510c..dc38234 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
deleted file mode 100644
index 9cae44e..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ParametersAreNonnullByDefault
-public class LimitPoller {
- // Longer timeout for first message as it includes connecting to the journal
- private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30);
-
- private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
-
- private final long minOffset;
- private final long maxMessages;
- private final Closeable headPoller;
- private final Queue<FullMessage<PackageMessage>> messages;
- private final Semaphore nextMessage;
-
- public LimitPoller(MessagingProvider messagingProvider,
- String packageTopic,
- long minOffset,
- long maxMessages) {
- this.minOffset = minOffset;
- this.maxMessages = maxMessages;
- this.messages = new ConcurrentLinkedQueue<>();
- this.nextMessage = new Semaphore(0);
- String assign = messagingProvider.assignTo(minOffset);
- log.info("Fetching {} messages starting from {}", maxMessages, minOffset);
- headPoller = messagingProvider.createPoller(
- packageTopic, Reset.earliest, assign,
- create(PackageMessage.class, this::handlePackage)
- );
- }
-
- public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
- try {
- boolean timeout = nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
- while (!timeout && this.messages.size() < maxMessages) {
- timeout = !nextMessage.tryAcquire(timeOut.toMillis(), TimeUnit.MILLISECONDS);
- }
- ArrayList<FullMessage<PackageMessage>> result = new ArrayList<>(messages);
- log.info("Fetched {} messages", result.size());
- return result;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- IOUtils.closeQuietly(headPoller);
- }
- }
-
- private void handlePackage(MessageInfo info, PackageMessage message) {
- long offset = info.getOffset();
- log.debug("Reading offset {}", offset);
- if (this.messages.size() < maxMessages && info.getOffset() >= minOffset) {
- messages.add(new FullMessage<>(info, message));
- }
- nextMessage.release();
- }
-
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
index 41a99b7..286147b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
index f1b5bf7..48bca57 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/PackageRetriesTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.bookkeeper;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
index 1abf03d..3bc1919 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/SimpleDistributionResponseTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.publisher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index e034365..66dac34 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -68,6 +68,7 @@
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
+import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
@@ -76,7 +77,6 @@
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
deleted file mode 100644
index 9f3dce5..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.samePropertyValuesAs;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-public class LimitPollerTest {
-
- private static final int MIN_OFFSET = 10;
- private static final int MAX_MESSAGES = 2;
- private static final String TOPIC = "topic";
- private static final Duration TIMEOUT = Duration.ofMillis(100);
-
- @Mock
- private MessagingProvider clientProvider;
-
- @Captor
- private ArgumentCaptor<HandlerAdapter<org.apache.sling.distribution.journal.messages.PackageMessage>> handlerCaptor;
-
- @Mock
- private Closeable poller;
-
- private MessageHandler<PackageMessage> handler;
-
- @Before
- public void before() {
- MockitoAnnotations.initMocks(this);
- when(clientProvider.assignTo(MIN_OFFSET))
- .thenReturn("0:" + MIN_OFFSET);
- when(clientProvider.createPoller(
- Mockito.eq(TOPIC),
- Mockito.eq(Reset.earliest),
- Mockito.eq("0:" + MIN_OFFSET),
- handlerCaptor.capture()))
- .thenReturn(poller);
- }
-
- @After
- public void after() throws IOException {
- verify(poller).close();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void test() throws Exception {
- LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
- handler = handlerCaptor.getValue().getHandler();
- FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
- FullMessage<PackageMessage> message2 = simulateMessage(MIN_OFFSET + 1);
- simulateMessage(MIN_OFFSET + 2);
- List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
- assertThat(actualMessages, contains(samePropertyValuesAs(message1), samePropertyValuesAs(message2)));
- }
-
- @Test
- public void testTimeout() throws Exception {
- LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
- handler = handlerCaptor.getValue().getHandler();
- FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
- List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
- assertThat(actualMessages, contains(samePropertyValuesAs(message1)));
- }
-
- private FullMessage<PackageMessage> simulateMessage(int offset) {
- FullMessage<PackageMessage> message = createMessage(offset);
- handler.handle(message.getInfo(), message.getMessage());
- return message;
- }
-
- private FullMessage<PackageMessage> createMessage(int offset) {
- MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
- PackageMessage message = PackageMessage.builder()
- .pubAgentName("agent1")
- .pubSlingId("pub1SlingId")
- .pkgId("package-" + offset)
- .reqType(ReqType.ADD)
- .pkgType("journal")
- .paths(Arrays.asList("path"))
- .build();
- return new FullMessage<>(info, message);
- }
-
-}