SLING-8751 - Webconsole plugin to download content packages (#12)
* SLING-8751 - Webconsole plugin to download content packages
* SLING-8751 - Webconsole plugin to download content packages
* SLING-8751 - Webconsole plugin to download content packages
* SLING-8751 - Provide next page link, allow to set timeout, provide
proper concurrency support
* SLING-8751 - Fix sonar issues
* SLING-8751 - Use longer timeout for download, support large binaries
* SLING-8751 - Use different connect timeout, check for null at adaptTo
diff --git a/pom.xml b/pom.xml
index 3dac69a..edf6d2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,18 +137,21 @@
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
<version>0.1.2</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.webconsole</artifactId>
+ <version>4.3.16</version>
+ </dependency>
<!-- OSGi -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
- <version>6.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
- <version>6.0.0</version>
<scope>provided</scope>
</dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java
new file mode 100644
index 0000000..d21edf1
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPoller.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ParametersAreNonnullByDefault
+public class LimitPoller {
+ // Longer timeout for first message as it includes connecting to the journal
+ private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30);
+
+ private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
+
+ private final long minOffset;
+ private final long maxMessages;
+ private final Closeable headPoller;
+ private final Queue<FullMessage<PackageMessage>> messages;
+ private final Semaphore nextMessage;
+
+ public LimitPoller(MessagingProvider messagingProvider,
+ String packageTopic,
+ long minOffset,
+ long maxMessages) {
+ this.minOffset = minOffset;
+ this.maxMessages = maxMessages;
+ this.messages = new ConcurrentLinkedQueue<>();
+ this.nextMessage = new Semaphore(0);
+ String assign = messagingProvider.assignTo(minOffset);
+ log.info("Fetching {} messages starting from {}", maxMessages, minOffset);
+ headPoller = messagingProvider.createPoller(
+ packageTopic, Reset.earliest, assign,
+ create(Messages.PackageMessage.class, this::handlePackage));
+ }
+
+ public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
+ try {
+ boolean timeout = nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ while (!timeout && this.messages.size() < maxMessages) {
+ timeout = !nextMessage.tryAcquire(timeOut.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ ArrayList<FullMessage<PackageMessage>> result = new ArrayList<>(messages);
+ log.info("Fetched {} messages", result.size());
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ IOUtils.closeQuietly(headPoller);
+ }
+ }
+
+ private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
+ long offset = info.getOffset();
+ log.debug("Reading offset {}", offset);
+ if (this.messages.size() < maxMessages && info.getOffset() >= minOffset) {
+ messages.add(new FullMessage<>(info, message));
+ }
+ nextMessage.release();
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
new file mode 100644
index 0000000..713537d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.impl.LimitPoller;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+@Component(service = PackageBrowser.class)
+public class PackageBrowser {
+
+ @Reference
+ private JournalAvailable journalAvailable;
+
+ @Reference
+ private MessagingProvider messagingProvider;
+
+ @Reference
+ private Topics topics;
+
+ @Reference
+ ResourceResolverFactory resolverFactory;
+
+ public List<FullMessage<PackageMessage>> getMessages(long startOffset, long numMessages, Duration timeout) {
+ LimitPoller poller = new LimitPoller(messagingProvider, topics.getPackageTopic(), startOffset, numMessages);
+ return poller.fetch(timeout);
+ }
+
+ public void writeTo(PackageMessage pkgMsg, OutputStream os) throws IOException {
+ try (ResourceResolver resolver = getServiceResolver("importer")) {
+ InputStream is = pkgStream(resolver, pkgMsg);
+ IOUtils.copy(is, os);
+ } catch (LoginException | DistributionException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Nonnull
+ public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+ if (pkgMsg.hasPkgBinary()) {
+ return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+ } else {
+ String pkgBinRef = pkgMsg.getPkgBinaryRef();
+ try {
+ Session session = resolver.adaptTo(Session.class);
+ if (session == null) {
+ throw new DistributionException("Unable to get Oak session");
+ }
+ ValueFactory factory = session.getValueFactory();
+ Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+ return binary.getStream();
+ } catch (RepositoryException e) {
+ throw new DistributionException(e.getMessage(), e);
+ }
+ }
+ }
+
+ private ResourceResolver getServiceResolver(String subService) throws LoginException {
+ return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
new file mode 100644
index 0000000..0ba1ce0
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.webconsole.AbstractWebConsolePlugin;
+import org.apache.felix.webconsole.WebConsoleConstants;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(service = Servlet.class,
+ property = {
+ Constants.SERVICE_DESCRIPTION + "=" + "Package viewer for content distribution",
+ WebConsoleConstants.PLUGIN_LABEL + "=" + PackageViewerPlugin.LABEL,
+ WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
+})
+public class PackageViewerPlugin extends AbstractWebConsolePlugin {
+ private static final Duration BROWSE_TIMEOUT = Duration.ofMillis(1000);
+ private static final Duration DOWNLOAD_TIMEOUT = Duration.ofMillis(20000);
+ private static final int NOT_FOUND = 404;
+ private static final int MAX_NUM_MESSAGES = 100;
+ private static final long serialVersionUID = -3113699912185558439L;
+ protected static final String LABEL = "distpackages";
+ protected static final String TITLE = "Distribution Package Viewer";
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ PackageBrowser packageBrowser; //NOSONAR
+
+ @Override
+ public String getLabel() {
+ return LABEL;
+ }
+
+ @Override
+ public String getTitle() {
+ return TITLE;
+ }
+
+ @Override
+ public String getCategory() {
+ return "Sling";
+ }
+
+ @Override
+ protected void renderContent(HttpServletRequest req, HttpServletResponse res)
+ throws ServletException, IOException {
+ Optional<Long> offset = getOffset(req);
+ if (!offset.isPresent()) {
+ String startOffsetSt = req.getParameter("startOffset");
+ long startOffset = startOffsetSt != null ? new Long(startOffsetSt) : 0;
+ renderPackageList(startOffset, res.getWriter());
+ } else {
+ writePackage(offset.get(), res);
+ }
+ }
+
+ private void renderPackageList(long startOffset, PrintWriter writer) {
+ writer.println("<table class=\"tablesorter nicetable noauto\">");
+ writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
+ List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, BROWSE_TIMEOUT);
+ msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
+ writer.println("</table>");
+ if (msgs.size() == MAX_NUM_MESSAGES) {
+ FullMessage<PackageMessage> lastMsg = msgs.get(msgs.size() - 1);
+ long nextOffset = lastMsg.getInfo().getOffset() + 1;
+ writer.println(String.format("<p><a href =\"%s?startOffset=%d\">next page</a>",
+ LABEL,
+ nextOffset));
+ }
+ }
+
+ private String writeMsg(FullMessage<PackageMessage> msg) {
+ return String.format("<tr><td><a href=\"%s/%d\">%s</a></td><td>%d</td><td>%s</td><td>%s</td></tr>",
+ LABEL,
+ msg.getInfo().getOffset(),
+ msg.getMessage().getPkgId(),
+ msg.getInfo().getOffset(),
+ msg.getMessage().getReqType(),
+ msg.getMessage().getPathsList().toString());
+ }
+
+ private void writePackage(Long offset, HttpServletResponse res) throws IOException {
+ log.info("Retrieving package with offset {}", offset);
+ List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, DOWNLOAD_TIMEOUT);
+ if (!msgs.isEmpty()) {
+ PackageMessage msg = msgs.iterator().next().getMessage();
+ res.setHeader("Content-Type", "application/octet-stream");
+ String filename = msg.getPkgId() + ".zip";
+ res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
+ packageBrowser.writeTo(msg, res.getOutputStream());
+ } else {
+ res.setStatus(NOT_FOUND);
+ }
+ }
+
+ @Override
+ protected boolean isHtmlRequest(HttpServletRequest request) {
+ return !getOffset(request).isPresent();
+ }
+
+ private Optional<Long> getOffset(HttpServletRequest req) {
+ int startIndex = LABEL.length() + 2;
+ if (startIndex <= req.getPathInfo().length()) {
+ String offsetSt = req.getPathInfo().substring(startIndex);
+ return Optional.of(new Long(offsetSt));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private boolean notTestMessage(FullMessage<PackageMessage> msg) {
+ return msg.getMessage().getReqType() != ReqType.TEST;
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3a63f20..1694e38 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -34,7 +34,6 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.Collections;
@@ -52,13 +51,10 @@
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -67,7 +63,6 @@
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
@@ -644,7 +639,7 @@
LOG.info("Importing paths " + pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
- pkgStream = pkgStream(resolver, pkgMsg);
+ pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
packageBuilder.installPackage(resolver, pkgStream);
extractor.handle(resolver, pkgMsg.getPathsList());
} finally {
@@ -712,22 +707,6 @@
LOG.info("Sent status message {}", status);
}
- @Nonnull
- private InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
- if (pkgMsg.hasPkgBinary()) {
- return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
- } else {
- String pkgBinRef = pkgMsg.getPkgBinaryRef();
- try {
- ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
- Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
- return binary.getStream();
- } catch (RepositoryException e) {
- throw new DistributionException(e.getMessage(), e);
- }
- }
- }
-
private void handleCommandMessage(MessageInfo info, CommandMessage message) {
if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
if (message.hasClearCommand()) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java
new file mode 100644
index 0000000..15d2c1f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/LimitPollerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.samePropertyValuesAs;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+
+public class LimitPollerTest {
+
+ private static final int MIN_OFFSET = 10;
+ private static final int MAX_MESSAGES = 2;
+ private static final String TOPIC = "topic";
+ private static final Duration TIMEOUT = Duration.ofMillis(100);
+
+ @Mock
+ private MessagingProvider clientProvider;
+
+ @Captor
+ private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> handlerCaptor;
+
+ @Mock
+ private Closeable poller;
+
+ private MessageHandler<PackageMessage> handler;
+
+ @Before
+ public void before() {
+ MockitoAnnotations.initMocks(this);
+ when(clientProvider.assignTo(MIN_OFFSET))
+ .thenReturn("0:" + MIN_OFFSET);
+ when(clientProvider.createPoller(
+ Mockito.eq(TOPIC),
+ Mockito.eq(Reset.earliest),
+ Mockito.eq("0:" + MIN_OFFSET),
+ handlerCaptor.capture()))
+ .thenReturn(poller);
+ }
+
+ @After
+ public void after() throws IOException {
+ verify(poller).close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test() throws Exception {
+ LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
+ handler = handlerCaptor.getValue().getHandler();
+ FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
+ FullMessage<PackageMessage> message2 = simulateMessage(MIN_OFFSET + 1);
+ simulateMessage(MIN_OFFSET + 2);
+ List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
+ assertThat(actualMessages, contains(samePropertyValuesAs(message1), samePropertyValuesAs(message2)));
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ LimitPoller poller = new LimitPoller(clientProvider, TOPIC, MIN_OFFSET, MAX_MESSAGES);
+ handler = handlerCaptor.getValue().getHandler();
+ FullMessage<PackageMessage> message1 = simulateMessage(MIN_OFFSET);
+ List<FullMessage<PackageMessage>> actualMessages = poller.fetch(TIMEOUT);
+ assertThat(actualMessages, contains(samePropertyValuesAs(message1)));
+ }
+
+ private FullMessage<PackageMessage> simulateMessage(int offset) {
+ FullMessage<PackageMessage> message = createMessage(offset);
+ handler.handle(message.getInfo(), message.getMessage());
+ return message;
+ }
+
+ private FullMessage<Messages.PackageMessage> createMessage(int offset) {
+ MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
+ PackageMessage message = Messages.PackageMessage.newBuilder()
+ .setPubAgentName("agent1")
+ .setPubSlingId("pub1SlingId")
+ .setPkgId("package-" + offset)
+ .setReqType(ReqType.ADD)
+ .setPkgType("journal")
+ .addPaths("path")
+ .build();
+ return new FullMessage<Messages.PackageMessage>(info, message);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
new file mode 100644
index 0000000..14dcc19
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
+import javax.jcr.Binary;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageBrowserTest {
+ private static final String DATA = "test";
+
+ @Mock
+ ResourceResolverFactory resolverFactory;
+
+ @Mock
+ ResourceResolver resolver;
+
+ @Mock
+ Session session;
+
+ @Mock
+ ValueFactory valueFactory;
+
+ @Mock
+ Value value;
+
+ @InjectMocks
+ PackageBrowser packageBrowser;
+
+ @Mock
+ Binary binary;
+
+ @Test
+ public void testWriteTo() throws Exception {
+ when(resolverFactory.getServiceResourceResolver(Mockito.any())).thenReturn(resolver);
+ when(resolver.adaptTo(Session.class)).thenReturn(session);
+ when(session.getValueFactory()).thenReturn(valueFactory);
+ when(valueFactory.createValue(Mockito.any(SimpleReferenceBinary.class))).thenReturn(value);
+ when(value.getBinary()).thenReturn(binary);
+ ByteArrayInputStream is = new ByteArrayInputStream(DATA.getBytes(Charset.forName("utf-8")));
+ when(binary.getStream()).thenReturn(is);
+ PackageMessage pkgMsg = createPackageMsg(0l);
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ packageBrowser.writeTo(pkgMsg, bao);
+ String resultSt = bao.toString("utf-8");
+ assertThat(resultSt, equalTo(DATA));
+ }
+
+ private PackageMessage createPackageMsg(long offset) throws Exception {
+ return PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
+ .addPaths("/content")
+ .setPkgId("pkgid")
+ .setPkgType("some_type")
+ .setPkgBinaryRef("myref").build();
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
new file mode 100644
index 0000000..aa2947a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.protobuf.ByteString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageViewerPluginTest {
+
+ @Spy
+ Topics topics = new Topics();
+
+ @Mock
+ private HttpServletRequest req;
+
+ @Mock
+ private HttpServletResponse res;
+
+ @Mock
+ MessagingProvider messagingProvider;
+
+ @Mock
+ ServletOutputStream servletOutputStream;
+
+ @Mock
+ PackageBrowser packageBrowser;
+
+ @InjectMocks
+ PackageViewerPlugin viewer;
+
+ private StringWriter outWriter;
+
+ @Before
+ public void before() throws IOException {
+ FullMessage<PackageMessage> msg1 = createPackageMsg(1l);
+ List<FullMessage<PackageMessage>> messages = Arrays.asList(msg1);
+ doReturn(messages).when(packageBrowser).getMessages(Mockito.eq(1l), Mockito.anyLong(), Mockito.any());
+ doReturn(messages).when(packageBrowser).getMessages(Mockito.eq(0l), Mockito.anyLong(), Mockito.any());
+ doReturn(emptyList()).when(packageBrowser).getMessages(Mockito.eq(2l), Mockito.anyLong(), Mockito.any());
+
+ outWriter = new StringWriter();
+ when(res.getWriter()).thenReturn(new PrintWriter(outWriter));
+ when(res.getOutputStream()).thenReturn(servletOutputStream);
+ }
+
+ @Test
+ public void testSimple() {
+ assertThat(viewer.getLabel(), equalTo("distpackages"));
+ assertThat(viewer.getTitle(), equalTo("Distribution Package Viewer"));
+ assertThat(viewer.getCategory(), equalTo("Sling"));
+ }
+
+ @Test
+ public void testPackageList() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages");
+
+ viewer.renderContent(req, res);
+
+ String outString = outWriter.getBuffer().toString();
+ System.out.println(outString);
+ assertThat(outString,
+ containsString("<tr><td><a href=\"distpackages/1\">pkgid</a></td><td>1</td><td>ADD</td><td>[/content]</td></tr>"));
+ }
+
+ @Test
+ public void testGetPackage() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/1");
+
+ viewer.renderContent(req, res);
+
+ verify(packageBrowser).getMessages(Mockito.eq(1l), Mockito.eq(1l), Mockito.any());
+ }
+
+ @Test
+ public void testGetPackageNotFound() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/2");
+
+ viewer.renderContent(req, res);
+
+ verify(res).setStatus(Mockito.eq(404));
+ verify(packageBrowser).getMessages(Mockito.eq(2l), Mockito.eq(1l), Mockito.any());
+ }
+
+ @Test
+ public void testIsHtmlPackage() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/1");
+
+ assertThat(viewer.isHtmlRequest(req), equalTo(false));
+ }
+
+ @Test
+ public void testIsHtmlMain() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages");
+
+ assertThat(viewer.isHtmlRequest(req), equalTo(true));
+ }
+
+ private FullMessage<PackageMessage> createPackageMsg(long offset) {
+ MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0l);
+ PackageMessage message = PackageMessage.newBuilder()
+ .setPubSlingId("")
+ .setReqType(ReqType.ADD)
+ .addPaths("/content")
+ .setPkgId("pkgid")
+ .setPkgType("some_type")
+ .setPkgBinary(ByteString.copyFrom("package content", Charset.defaultCharset()))
+ .build();
+ FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
+ return msg;
+ }
+
+}