SLING-8751 - Webconsole plugin to download content packages (#12)

* SLING-8751 - Webconsole plugin to download content packages

* SLING-8751 - Webconsole plugin to download content packages

* SLING-8751 - Webconsole plugin to download content packages

* SLING-8751 - Provide next page link, allow to set timeout, provide
proper concurrency support

* SLING-8751 - Fix sonar issues

* SLING-8751 - Use longer timeout for download, support large binaries

* SLING-8751 - Use different connect timeout, check for null at adaptTo
diff --git a/pom.xml b/pom.xml
index 3dac69a..edf6d2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,18 +137,21 @@
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
             <version>0.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.webconsole</artifactId>
+            <version>4.3.16</version>
+        </dependency>
 
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi.core</artifactId>
-            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi.cmpn</artifactId>
-            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java
new file mode 100644
index 0000000..d21edf1
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ParametersAreNonnullByDefault
+public class LimitPoller {
+    // Longer timeout for first message as it includes connecting to the journal
+    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30);
+
+    private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
+    
+    private final long minOffset;
+    private final long maxMessages;
+    private final Closeable headPoller;
+    private final Queue<FullMessage<PackageMessage>> messages;
+    private final Semaphore nextMessage;
+    
+    public LimitPoller(MessagingProvider messagingProvider,
+                          String packageTopic,
+                          long minOffset,
+                          long maxMessages) {
+        this.minOffset = minOffset;
+        this.maxMessages = maxMessages;
+        this.messages = new ConcurrentLinkedQueue<>();
+        this.nextMessage = new Semaphore(0);
+        String assign = messagingProvider.assignTo(minOffset);
+        log.info("Fetching {} messages starting from {}", maxMessages, minOffset);
+        headPoller = messagingProvider.createPoller(
+                packageTopic, Reset.earliest, assign,
+                create(Messages.PackageMessage.class, this::handlePackage));
+    }
+
+    public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
+        try {
+            boolean timeout = nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+            while (!timeout && this.messages.size() < maxMessages) {
+                timeout = !nextMessage.tryAcquire(timeOut.toMillis(), TimeUnit.MILLISECONDS);
+            }
+            ArrayList<FullMessage<PackageMessage>> result = new ArrayList<>(messages);
+            log.info("Fetched {} messages", result.size());
+            return result;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(headPoller);
+        }
+    }
+
+    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
+        long offset = info.getOffset();
+        log.debug("Reading offset {}", offset);
+        if (this.messages.size() < maxMessages && info.getOffset() >= minOffset) {
+            messages.add(new FullMessage<>(info, message));
+        }
+        nextMessage.release();
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
new file mode 100644
index 0000000..713537d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.impl.LimitPoller;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+@Component(service = PackageBrowser.class)
+public class PackageBrowser {
+    
+    @Reference
+    private JournalAvailable journalAvailable;
+    
+    @Reference
+    private MessagingProvider messagingProvider;
+    
+    @Reference
+    private Topics topics;
+    
+    @Reference
+    ResourceResolverFactory resolverFactory;
+    
+    public List<FullMessage<PackageMessage>> getMessages(long startOffset, long numMessages, Duration timeout) {
+        LimitPoller poller = new LimitPoller(messagingProvider, topics.getPackageTopic(), startOffset, numMessages);
+        return poller.fetch(timeout);
+    }
+    
+    public void writeTo(PackageMessage pkgMsg, OutputStream os) throws IOException {
+        try (ResourceResolver resolver = getServiceResolver("importer")) {
+            InputStream is = pkgStream(resolver, pkgMsg);
+            IOUtils.copy(is, os);
+        } catch (LoginException | DistributionException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+    
+    @Nonnull
+    public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+        if (pkgMsg.hasPkgBinary()) {
+            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+        } else {
+            String pkgBinRef = pkgMsg.getPkgBinaryRef();
+            try {
+                Session session = resolver.adaptTo(Session.class);
+                if (session == null) {
+                    throw new DistributionException("Unable to get Oak session");
+                }
+                ValueFactory factory = session.getValueFactory();
+                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+                return binary.getStream();
+            } catch (RepositoryException e) {
+                throw new DistributionException(e.getMessage(), e);
+            }
+        }
+    }
+    
+    private ResourceResolver getServiceResolver(String subService) throws LoginException {
+        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
new file mode 100644
index 0000000..0ba1ce0
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.webconsole.AbstractWebConsolePlugin;
+import org.apache.felix.webconsole.WebConsoleConstants;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(service = Servlet.class,
+        property = {
+        Constants.SERVICE_DESCRIPTION + "=" + "Package viewer for content distribution",
+        WebConsoleConstants.PLUGIN_LABEL + "=" + PackageViewerPlugin.LABEL,
+        WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
+})
+public class PackageViewerPlugin extends AbstractWebConsolePlugin {
+    private static final Duration BROWSE_TIMEOUT = Duration.ofMillis(1000);
+    private static final Duration DOWNLOAD_TIMEOUT = Duration.ofMillis(20000);
+    private static final int NOT_FOUND = 404;
+    private static final int MAX_NUM_MESSAGES = 100;
+    private static final long serialVersionUID = -3113699912185558439L;
+    protected static final String LABEL = "distpackages";
+    protected static final String TITLE = "Distribution Package Viewer";
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    PackageBrowser packageBrowser; //NOSONAR
+    
+    @Override
+    public String getLabel() {
+        return LABEL;
+    }
+
+    @Override
+    public String getTitle() {
+        return TITLE;
+    }
+
+    @Override
+    public String getCategory() {
+        return "Sling";
+    }
+    
+    @Override
+    protected void renderContent(HttpServletRequest req, HttpServletResponse res)
+            throws ServletException, IOException {
+        Optional<Long> offset = getOffset(req);
+        if (!offset.isPresent()) {
+            String startOffsetSt = req.getParameter("startOffset");
+            long startOffset = startOffsetSt != null ? new Long(startOffsetSt) : 0;
+            renderPackageList(startOffset, res.getWriter());
+        } else {
+            writePackage(offset.get(), res);
+        }
+    }
+
+    private void renderPackageList(long startOffset, PrintWriter writer) {
+        writer.println("<table class=\"tablesorter nicetable noauto\">");
+        writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
+        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, BROWSE_TIMEOUT);
+        msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
+        writer.println("</table>");
+        if (msgs.size() == MAX_NUM_MESSAGES) {
+            FullMessage<PackageMessage> lastMsg = msgs.get(msgs.size() - 1);
+            long nextOffset = lastMsg.getInfo().getOffset() + 1;
+            writer.println(String.format("<p><a href =\"%s?startOffset=%d\">next page</a>",
+                    LABEL,
+                    nextOffset));
+        }
+    }
+
+    private String writeMsg(FullMessage<PackageMessage> msg) {
+        return String.format("<tr><td><a href=\"%s/%d\">%s</a></td><td>%d</td><td>%s</td><td>%s</td></tr>",
+                LABEL,
+                msg.getInfo().getOffset(),
+                msg.getMessage().getPkgId(),
+                msg.getInfo().getOffset(),
+                msg.getMessage().getReqType(),
+                msg.getMessage().getPathsList().toString());
+    }
+
+    private void writePackage(Long offset, HttpServletResponse res) throws IOException {
+        log.info("Retrieving package with offset {}", offset);
+        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, DOWNLOAD_TIMEOUT);
+        if (!msgs.isEmpty()) {
+            PackageMessage msg = msgs.iterator().next().getMessage();
+            res.setHeader("Content-Type", "application/octet-stream");
+            String filename = msg.getPkgId() + ".zip";
+            res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
+            packageBrowser.writeTo(msg, res.getOutputStream());
+        } else {
+            res.setStatus(NOT_FOUND);
+        }
+    }
+    
+    @Override
+    protected boolean isHtmlRequest(HttpServletRequest request) {
+        return !getOffset(request).isPresent();
+    }
+
+    private Optional<Long> getOffset(HttpServletRequest req) {
+        int startIndex = LABEL.length() + 2;
+        if (startIndex <= req.getPathInfo().length()) {
+            String offsetSt = req.getPathInfo().substring(startIndex);
+            return Optional.of(new Long(offsetSt));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private boolean notTestMessage(FullMessage<PackageMessage> msg) {
+        return msg.getMessage().getReqType() != ReqType.TEST;
+    }
+    
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3a63f20..1694e38 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -34,7 +34,6 @@
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.InputStream;
 import java.util.Collections;
@@ -52,13 +51,10 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
 
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -67,7 +63,6 @@
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
@@ -644,7 +639,7 @@
         LOG.info("Importing paths " + pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
-            pkgStream = pkgStream(resolver, pkgMsg);
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
             packageBuilder.installPackage(resolver, pkgStream);
             extractor.handle(resolver, pkgMsg.getPathsList());
         } finally {
@@ -712,22 +707,6 @@
         LOG.info("Sent status message {}", status);
     }
 
-    @Nonnull
-    private InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
-        if (pkgMsg.hasPkgBinary()) {
-            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
-        } else {
-            String pkgBinRef = pkgMsg.getPkgBinaryRef();
-            try {
-                ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
-                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
-                return binary.getStream();
-            } catch (RepositoryException e) {
-                throw new DistributionException(e.getMessage(), e);
-            }
-        }
-    }
-
     private void handleCommandMessage(MessageInfo info, CommandMessage message) {
         if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
             if (message.hasClearCommand()) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java
new file mode 100644
index 0000000..15d2c1f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.samePropertyValuesAs;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+
+public class LimitPollerTest {
+
+    private static final int MIN_OFFSET = 10;
+    private static final int MAX_MESSAGES = 2;
+    private static final String TOPIC = "topic";
+    private static final Duration TIMEOUT = Duration.ofMillis(100);
+    
+    @Mock
+    private MessagingProvider clientProvider;
+    
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> handlerCaptor;
+    
+    @Mock
+    private Closeable poller;
+    
+    private MessageHandler<PackageMessage> handler;
+    
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+        when(clientProvider.assignTo(MIN_OFFSET))
+                .thenReturn("0:" + MIN_OFFSET);
+        when(clientProvider.createPoller(
+                Mockito.eq(TOPIC), 
+                Mockito.eq(Reset.earliest),
+                Mockito.eq("0:" + MIN_OFFSET),
+                handlerCaptor.capture()))
+        .thenReturn(poller);
+    }
+
+    @After
+    public void after() throws IOException {
+        verify(poller).close();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test() throws Exception {
+        LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
+        handler = handlerCaptor.getValue().getHandler();
+        FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
+        FullMessage<PackageMessage> message2 = simulateMessage(MIN_OFFSET + 1);
+        simulateMessage(MIN_OFFSET + 2);
+        List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
+        assertThat(actualMessages, contains(samePropertyValuesAs(message1), samePropertyValuesAs(message2)));
+    }
+    
+    @Test
+    public void testTimeout() throws Exception {
+        LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
+        handler = handlerCaptor.getValue().getHandler();
+        FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
+        List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
+        assertThat(actualMessages, contains(samePropertyValuesAs(message1)));
+    }
+    
+    private FullMessage<PackageMessage> simulateMessage(int offset) {
+        FullMessage<PackageMessage> message = createMessage(offset);
+        handler.handle(message.getInfo(), message.getMessage());
+        return message;
+    }
+
+    private FullMessage<Messages.PackageMessage> createMessage(int offset) {
+        MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
+        PackageMessage message = Messages.PackageMessage.newBuilder()
+                .setPubAgentName("agent1")
+                .setPubSlingId("pub1SlingId")
+                .setPkgId("package-" + offset)
+                .setReqType(ReqType.ADD)
+                .setPkgType("journal")
+                .addPaths("path")
+                .build();
+        return new FullMessage<Messages.PackageMessage>(info, message);
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
new file mode 100644
index 0000000..14dcc19
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
+import javax.jcr.Binary;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageBrowserTest {
+    private static final String DATA = "test";
+
+    @Mock
+    ResourceResolverFactory resolverFactory;
+
+    @Mock
+    ResourceResolver resolver;
+
+    @Mock
+    Session session;
+
+    @Mock
+    ValueFactory valueFactory;
+     
+    @Mock
+    Value value;
+
+    @InjectMocks
+    PackageBrowser packageBrowser;
+
+    @Mock
+    Binary binary;
+
+    @Test
+    public void testWriteTo() throws Exception {
+        when(resolverFactory.getServiceResourceResolver(Mockito.any())).thenReturn(resolver);
+        when(resolver.adaptTo(Session.class)).thenReturn(session);
+        when(session.getValueFactory()).thenReturn(valueFactory);
+        when(valueFactory.createValue(Mockito.any(SimpleReferenceBinary.class))).thenReturn(value);
+        when(value.getBinary()).thenReturn(binary);
+        ByteArrayInputStream is = new ByteArrayInputStream(DATA.getBytes(Charset.forName("utf-8")));
+        when(binary.getStream()).thenReturn(is);
+        PackageMessage pkgMsg = createPackageMsg(0l);
+        ByteArrayOutputStream bao = new ByteArrayOutputStream();
+        packageBrowser.writeTo(pkgMsg, bao);
+        String resultSt = bao.toString("utf-8");
+        assertThat(resultSt,  equalTo(DATA));
+    }
+
+    private PackageMessage createPackageMsg(long offset) throws Exception {
+        return PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
+                .addPaths("/content")
+                .setPkgId("pkgid")
+                .setPkgType("some_type")
+                .setPkgBinaryRef("myref").build();
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
new file mode 100644
index 0000000..aa2947a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.protobuf.ByteString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageViewerPluginTest {
+
+    @Spy
+    Topics topics = new Topics();
+
+    @Mock
+    private HttpServletRequest req;
+    
+    @Mock
+    private HttpServletResponse res;
+    
+    @Mock
+    MessagingProvider messagingProvider;
+    
+    @Mock
+    ServletOutputStream servletOutputStream;
+    
+    @Mock
+    PackageBrowser packageBrowser;
+    
+    @InjectMocks
+    PackageViewerPlugin viewer;
+
+    private StringWriter outWriter;
+
+    @Before
+    public void before() throws IOException {
+        FullMessage<PackageMessage> msg1 = createPackageMsg(1l);
+        List<FullMessage<PackageMessage>> messages  = Arrays.asList(msg1);
+        doReturn(messages).when(packageBrowser).getMessages(Mockito.eq(1l), Mockito.anyLong(), Mockito.any());
+        doReturn(messages).when(packageBrowser).getMessages(Mockito.eq(0l), Mockito.anyLong(), Mockito.any());
+        doReturn(emptyList()).when(packageBrowser).getMessages(Mockito.eq(2l), Mockito.anyLong(), Mockito.any());
+
+        outWriter = new StringWriter();
+        when(res.getWriter()).thenReturn(new PrintWriter(outWriter));
+        when(res.getOutputStream()).thenReturn(servletOutputStream);
+    }
+
+    @Test
+    public void testSimple() {
+        assertThat(viewer.getLabel(), equalTo("distpackages"));
+        assertThat(viewer.getTitle(), equalTo("Distribution Package Viewer"));
+        assertThat(viewer.getCategory(), equalTo("Sling"));
+    }
+    
+    @Test
+    public void testPackageList() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages");
+        
+        viewer.renderContent(req, res);
+
+        String outString = outWriter.getBuffer().toString();
+        System.out.println(outString);
+        assertThat(outString, 
+                containsString("<tr><td><a href=\"distpackages/1\">pkgid</a></td><td>1</td><td>ADD</td><td>[/content]</td></tr>"));
+    }
+
+    @Test
+    public void testGetPackage() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/1");
+        
+        viewer.renderContent(req, res);
+        
+        verify(packageBrowser).getMessages(Mockito.eq(1l), Mockito.eq(1l), Mockito.any());
+    }
+    
+    @Test
+    public void testGetPackageNotFound() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/2");
+        
+        viewer.renderContent(req, res);
+        
+        verify(res).setStatus(Mockito.eq(404));
+        verify(packageBrowser).getMessages(Mockito.eq(2l), Mockito.eq(1l), Mockito.any());
+    }
+    
+    @Test
+    public void testIsHtmlPackage() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/1");
+
+        assertThat(viewer.isHtmlRequest(req), equalTo(false));
+    }
+
+    @Test
+    public void testIsHtmlMain() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages");
+
+        assertThat(viewer.isHtmlRequest(req), equalTo(true));
+    }
+
+    private FullMessage<PackageMessage> createPackageMsg(long offset) {
+        MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0l);
+        PackageMessage message = PackageMessage.newBuilder()
+                .setPubSlingId("")
+                .setReqType(ReqType.ADD)
+                .addPaths("/content")
+                .setPkgId("pkgid")
+                .setPkgType("some_type")
+                .setPkgBinary(ByteString.copyFrom("package content", Charset.defaultCharset()))
+                .build();
+        FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
+        return msg;
+    }
+
+}