SLING-8751 - Use longer timeout for download, support large binaries
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index 4ab72a9..56ef007 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -18,9 +18,28 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -41,8 +60,40 @@
@Reference
private Topics topics;
+ @Reference
+ ResourceResolverFactory resolverFactory;
+
public List<FullMessage<PackageMessage>> getMessages(long startOffset, long numMessages, Duration timeout) {
LimitPoller poller = new LimitPoller(messagingProvider, topics.getPackageTopic(), startOffset, numMessages);
return poller.fetch(timeout);
}
+
+ public void writeTo(PackageMessage pkgMsg, OutputStream os) throws IOException {
+ try (ResourceResolver resolver = getServiceResolver("importer")) {
+ InputStream is = pkgStream(resolver, pkgMsg);
+ IOUtils.copy(is, os);
+ } catch (LoginException | DistributionException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Nonnull
+ public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+ if (pkgMsg.hasPkgBinary()) {
+ return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+ } else {
+ String pkgBinRef = pkgMsg.getPkgBinaryRef();
+ try {
+ ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
+ Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+ return binary.getStream();
+ } catch (RepositoryException e) {
+ throw new DistributionException(e.getMessage(), e);
+ }
+ }
+ }
+
+ private ResourceResolver getServiceResolver(String subService) throws LoginException {
+ return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index aac8298..0ba1ce0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -47,7 +47,8 @@
WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
})
public class PackageViewerPlugin extends AbstractWebConsolePlugin {
- private static final Duration TIMEOUT = Duration.ofMillis(1000);
+ private static final Duration BROWSE_TIMEOUT = Duration.ofMillis(1000);
+ private static final Duration DOWNLOAD_TIMEOUT = Duration.ofMillis(20000);
private static final int NOT_FOUND = 404;
private static final int MAX_NUM_MESSAGES = 100;
private static final long serialVersionUID = -3113699912185558439L;
@@ -90,7 +91,7 @@
private void renderPackageList(long startOffset, PrintWriter writer) {
writer.println("<table class=\"tablesorter nicetable noauto\">");
writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
- List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, TIMEOUT);
+ List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(startOffset, MAX_NUM_MESSAGES, BROWSE_TIMEOUT);
msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
writer.println("</table>");
if (msgs.size() == MAX_NUM_MESSAGES) {
@@ -114,13 +115,13 @@
private void writePackage(Long offset, HttpServletResponse res) throws IOException {
log.info("Retrieving package with offset {}", offset);
- List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, TIMEOUT);
+ List<FullMessage<PackageMessage>> msgs = packageBrowser.getMessages(offset, 1, DOWNLOAD_TIMEOUT);
if (!msgs.isEmpty()) {
PackageMessage msg = msgs.iterator().next().getMessage();
res.setHeader("Content-Type", "application/octet-stream");
String filename = msg.getPkgId() + ".zip";
res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
- msg.getPkgBinary().writeTo(res.getOutputStream());
+ packageBrowser.writeTo(msg, res.getOutputStream());
} else {
res.setStatus(NOT_FOUND);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3a63f20..1694e38 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -34,7 +34,6 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.Collections;
@@ -52,13 +51,10 @@
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -67,7 +63,6 @@
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
@@ -644,7 +639,7 @@
LOG.info("Importing paths " + pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
- pkgStream = pkgStream(resolver, pkgMsg);
+ pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
packageBuilder.installPackage(resolver, pkgStream);
extractor.handle(resolver, pkgMsg.getPathsList());
} finally {
@@ -712,22 +707,6 @@
LOG.info("Sent status message {}", status);
}
- @Nonnull
- private InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
- if (pkgMsg.hasPkgBinary()) {
- return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
- } else {
- String pkgBinRef = pkgMsg.getPkgBinaryRef();
- try {
- ValueFactory factory = resolver.adaptTo(Session.class).getValueFactory();
- Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
- return binary.getStream();
- } catch (RepositoryException e) {
- throw new DistributionException(e.getMessage(), e);
- }
- }
- }
-
private void handleCommandMessage(MessageInfo info, CommandMessage message) {
if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
if (message.hasClearCommand()) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
new file mode 100644
index 0000000..14dcc19
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
+import javax.jcr.Binary;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageBrowserTest {
+ private static final String DATA = "test";
+
+ @Mock
+ ResourceResolverFactory resolverFactory;
+
+ @Mock
+ ResourceResolver resolver;
+
+ @Mock
+ Session session;
+
+ @Mock
+ ValueFactory valueFactory;
+
+ @Mock
+ Value value;
+
+ @InjectMocks
+ PackageBrowser packageBrowser;
+
+ @Mock
+ Binary binary;
+
+ @Test
+ public void testWriteTo() throws Exception {
+ when(resolverFactory.getServiceResourceResolver(Mockito.any())).thenReturn(resolver);
+ when(resolver.adaptTo(Session.class)).thenReturn(session);
+ when(session.getValueFactory()).thenReturn(valueFactory);
+ when(valueFactory.createValue(Mockito.any(SimpleReferenceBinary.class))).thenReturn(value);
+ when(value.getBinary()).thenReturn(binary);
+ ByteArrayInputStream is = new ByteArrayInputStream(DATA.getBytes(Charset.forName("utf-8")));
+ when(binary.getStream()).thenReturn(is);
+ PackageMessage pkgMsg = createPackageMsg(0l);
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ packageBrowser.writeTo(pkgMsg, bao);
+ String resultSt = bao.toString("utf-8");
+ assertThat(resultSt, equalTo(DATA));
+ }
+
+ private PackageMessage createPackageMsg(long offset) throws Exception {
+ return PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
+ .addPaths("/content")
+ .setPkgId("pkgid")
+ .setPkgType("some_type")
+ .setPkgBinaryRef("myref").build();
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index 207a875..aa2947a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -146,7 +146,7 @@
}
private FullMessage<PackageMessage> createPackageMsg(long offset) {
- MessageInfo info = new TestMessageInfo(offset);
+ MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0l);
PackageMessage message = PackageMessage.newBuilder()
.setPubSlingId("")
.setReqType(ReqType.ADD)
@@ -158,34 +158,5 @@
FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
return msg;
}
-
- class TestMessageInfo implements MessageInfo {
-
- private long offset;
- public TestMessageInfo(long offset) {
- this.offset = offset;
- }
-
- @Override
- public String getTopic() {
- return topics.getPackageTopic();
- }
-
- @Override
- public int getPartition() {
- return 0;
- }
-
- @Override
- public long getOffset() {
- return this.offset;
- }
-
- @Override
- public long getCreateTime() {
- return 0;
- }
-
- }
}