SLING-8751 - Use longer timeout for download, support large binaries
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index 4ab72a9..56ef007 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -18,9 +18,28 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.time.Duration;
 import java.util.List;
 
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -41,8 +60,40 @@
     @Reference
     private Topics topics;
     
+    @Reference
+    ResourceResolverFactory resolverFactory;
+    
     public List<FullMessage<PackageMessage>> getMessages(long startOffset, long numMessages, Duration timeout) {
         LimitPoller poller = new LimitPoller(messagingProvider, topics.getPackageTopic(), startOffset, numMessages);
         return poller.fetch(timeout);
     }
+    
+    public void writeTo(PackageMessage pkgMsg, OutputStream os) throws IOException {
+        try (ResourceResolver resolver = getServiceResolver("importer")) {
+            InputStream is = pkgStream(resolver, pkgMsg);
+            IOUtils.copy(is, os);
+        } catch (LoginException | DistributionException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+    
+    @Nonnull
+    public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+        if (pkgMsg.hasPkgBinary()) {
+            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+        } else {
+            String pkgBinRef = pkgMsg.getPkgBinaryRef();
+            try {
+                ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
+                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+                return binary.getStream();
+            } catch (RepositoryException e) {
+                throw new DistributionException(e.getMessage(), e);
+            }
+        }
+    }
+    
+    private ResourceResolver getServiceResolver(String subService) throws LoginException {
+        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index aac8298..0ba1ce0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -47,7 +47,8 @@
         WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
 })
 public class PackageViewerPlugin extends AbstractWebConsolePlugin {
-    private static final Duration TIMEOUT = Duration.ofMillis(1000);
+    private static final Duration BROWSE_TIMEOUT = Duration.ofMillis(1000);
+    private static final Duration DOWNLOAD_TIMEOUT = Duration.ofMillis(20000);
     private static final int NOT_FOUND = 404;
     private static final int MAX_NUM_MESSAGES = 100;
     private static final long serialVersionUID = -3113699912185558439L;
@@ -90,7 +91,7 @@
     private void renderPackageList(long startOffset, PrintWriter writer) {
         writer.println("<table class=\"tablesorter nicetable noauto\">");
         writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
-        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, TIMEOUT);
+        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, BROWSE_TIMEOUT);
         msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
         writer.println("</table>");
         if (msgs.size() == MAX_NUM_MESSAGES) {
@@ -114,13 +115,13 @@
 
     private void writePackage(Long offset, HttpServletResponse res) throws IOException {
         log.info("Retrieving package with offset {}", offset);
-        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, TIMEOUT);
+        List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, DOWNLOAD_TIMEOUT);
         if (!msgs.isEmpty()) {
             PackageMessage msg = msgs.iterator().next().getMessage();
             res.setHeader("Content-Type", "application/octet-stream");
             String filename = msg.getPkgId() + ".zip";
             res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
-            msg.getPkgBinary().writeTo(res.getOutputStream());
+            packageBrowser.writeTo(msg, res.getOutputStream());
         } else {
             res.setStatus(NOT_FOUND);
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3a63f20..1694e38 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -34,7 +34,6 @@
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.InputStream;
 import java.util.Collections;
@@ -52,13 +51,10 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
 
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -67,7 +63,6 @@
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
@@ -644,7 +639,7 @@
         LOG.info("Importing paths " + pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
-            pkgStream = pkgStream(resolver, pkgMsg);
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
             packageBuilder.installPackage(resolver, pkgStream);
             extractor.handle(resolver, pkgMsg.getPathsList());
         } finally {
@@ -712,22 +707,6 @@
         LOG.info("Sent status message {}", status);
     }
 
-    @Nonnull
-    private InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
-        if (pkgMsg.hasPkgBinary()) {
-            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
-        } else {
-            String pkgBinRef = pkgMsg.getPkgBinaryRef();
-            try {
-                ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
-                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
-                return binary.getStream();
-            } catch (RepositoryException e) {
-                throw new DistributionException(e.getMessage(), e);
-            }
-        }
-    }
-
     private void handleCommandMessage(MessageInfo info, CommandMessage message) {
         if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
             if (message.hasClearCommand()) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
new file mode 100644
index 0000000..14dcc19
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
+import javax.jcr.Binary;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageBrowserTest {
+    private static final String DATA = "test";
+
+    @Mock
+    ResourceResolverFactory resolverFactory;
+
+    @Mock
+    ResourceResolver resolver;
+
+    @Mock
+    Session session;
+
+    @Mock
+    ValueFactory valueFactory;
+     
+    @Mock
+    Value value;
+
+    @InjectMocks
+    PackageBrowser packageBrowser;
+
+    @Mock
+    Binary binary;
+
+    @Test
+    public void testWriteTo() throws Exception {
+        when(resolverFactory.getServiceResourceResolver(Mockito.any())).thenReturn(resolver);
+        when(resolver.adaptTo(Session.class)).thenReturn(session);
+        when(session.getValueFactory()).thenReturn(valueFactory);
+        when(valueFactory.createValue(Mockito.any(SimpleReferenceBinary.class))).thenReturn(value);
+        when(value.getBinary()).thenReturn(binary);
+        ByteArrayInputStream is = new ByteArrayInputStream(DATA.getBytes(Charset.forName("utf-8")));
+        when(binary.getStream()).thenReturn(is);
+        PackageMessage pkgMsg = createPackageMsg(0l);
+        ByteArrayOutputStream bao = new ByteArrayOutputStream();
+        packageBrowser.writeTo(pkgMsg, bao);
+        String resultSt = bao.toString("utf-8");
+        assertThat(resultSt,  equalTo(DATA));
+    }
+
+    private PackageMessage createPackageMsg(long offset) throws Exception {
+        return PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
+                .addPaths("/content")
+                .setPkgId("pkgid")
+                .setPkgType("some_type")
+                .setPkgBinaryRef("myref").build();
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index 207a875..aa2947a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -146,7 +146,7 @@
     }
 
     private FullMessage<PackageMessage> createPackageMsg(long offset) {
-        MessageInfo info = new TestMessageInfo(offset);
+        MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0l);
         PackageMessage message = PackageMessage.newBuilder()
                 .setPubSlingId("")
                 .setReqType(ReqType.ADD)
@@ -158,34 +158,5 @@
         FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
         return msg;
     }
-    
-    class TestMessageInfo implements MessageInfo {
-        
-        private long offset;
 
-        public TestMessageInfo(long offset) {
-            this.offset = offset;
-        }
-
-        @Override
-        public String getTopic() {
-            return topics.getPackageTopic();
-        }
-
-        @Override
-        public int getPartition() {
-            return 0;
-        }
-
-        @Override
-        public long getOffset() {
-            return this.offset;
-        }
-
-        @Override
-        public long getCreateTime() {
-            return 0;
-        }
-        
-    }
 }