| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sling.servlets.post.impl.helper; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.servlet.ServletContext; |
| import javax.servlet.http.Part; |
| |
| import org.apache.jackrabbit.JcrConstants; |
| import org.apache.sling.api.resource.ModifiableValueMap; |
| import org.apache.sling.api.resource.PersistenceException; |
| import org.apache.sling.api.resource.Resource; |
| import org.apache.sling.api.resource.ValueMap; |
| import org.apache.sling.servlets.post.Modification; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Supports streamed uploads including where the stream is made up of partial body parts. |
| * The behaviour is documented here https://cwiki.apache.org/confluence/display/SLING/Chunked+File+Upload+Support, adding the ability |
| * to define a body part with a Content-Range header on the part. Since the body parts are streamed there are some restrictions. If the |
| * length of a body part is missing from either the Content-Range header or a Content-Length header in the Part, then the length of the part is |
| * assumed to be the rest of the body to make the total length of the upload that specified in earlier Content-Range headers or a @Length property. |
| * |
| * When using only Content-Range headers (see the HTTP 1.1 spec) the Content-Range header must be complete and applied to the Part of the body. |
| * The length of the full file must be specified and be the same on all body parts, and the body parts must be sent in order. This is a restriction |
| * of the Sling Chunked File Upload protocol. When the total uploaded equals the file length the chunked uploads are processed to generate the final upload. |
| * |
| * When using request parameters, the most recent request parameters are used for @Completed, @Offset and @Length. When using request parameters if the |
| * Content-Length header is missing from the body Part, then the Body part is assumed to be the final body part. Then the total uploaded equals the value of |
| * the @Length parameter or a @Completed parameter is present, then the body parts are joined into a single body part. |
| * |
| * Consolidating body parts will cause all body parts to be read from the DS, which will incure 3x the IO of a non body part or chunked upload. For FS DS the IO may be from |
| * OS level disk cache. For other styles of DS the IO may consume more resources. Chunked or Body part uploads are not as efficient as whole body uploads and should |
| * be avoided wherever possible. This could be avoided if Oak would expose a seekable OutputStream, or allow writes to Binaries to specify and offset. |
| * |
| * |
| * |
| */ |
| public class StreamedChunk { |
| private static final String SLING_CHUNKS_LENGTH = "sling:length"; |
| private static final String SLING_FILE_LENGTH = "sling:fileLength"; |
| private static final String SLING_CHUNK_MIXIN = "sling:chunks"; |
| private static final String SLING_CHUNK_NT = "sling:chunk"; |
| private static final String SLING_OFFSET = "sling:offset"; |
| private static final String MT_APP_OCTET = "application/octet-stream"; |
| private final Logger LOGGER = LoggerFactory.getLogger(StreamedChunk.class); |
| |
| private final long offset; |
| private final long chunkLength; |
| private final long fileLength; |
| private final Part part; |
| private ServletContext servletContext; |
| private final boolean completed; |
| private final boolean chunked; |
| private final String chunkResourceName; |
| |
| /** |
| * Construct a chunk from the part and form fields. Once constructed it is immutable exposing a store method to store the chunk. |
| * If the part does not represent a chunk then the class behaves as if the chunk is a upload of 1 chunk (ie the whole file). |
| * @param part the current part, not read other than headers. |
| * @param formFields form fields encountered in teh request stream prior to this part. |
| * @param servletContext the current servlet context needed to resolve mimetypes. |
| */ |
| public StreamedChunk(Part part, Map<String, List<String>> formFields, ServletContext servletContext) { |
| this.part = part; |
| this.servletContext = servletContext; |
| |
| String contentRangeHeader = part.getHeader("Content-Range"); |
| String contentLengthHeader = part.getHeader("Content-Length"); |
| if ( contentRangeHeader != null ) { |
| ContentRange contentRange = new ContentRange(contentRangeHeader); |
| fileLength = contentRange.length; |
| offset = contentRange.offset; |
| chunkLength = contentRange.range; |
| chunked = true; |
| |
| } else if ( formFields.containsKey(part.getName()+"@Length") && formFields.containsKey(part.getName()+"@Offset")) { |
| fileLength = Long.parseLong(lastFrom(formFields.get(part.getName() + "@Length"))); |
| offset = Long.parseLong(lastFrom(formFields.get(part.getName() + "@Offset"))); |
| if (contentLengthHeader != null) { |
| chunkLength = Long.parseLong(contentLengthHeader); |
| } else if ( formFields.containsKey(part.getName() + "@PartLength")) { |
| chunkLength = Long.parseLong(lastFrom(formFields.get(part.getName() + "@PartLength"))); |
| } else { |
| // must assume the chunk contains all the data. |
| LOGGER.info("No part length specified assuming this is the final part of the chunked upload."); |
| chunkLength = fileLength - offset; |
| } |
| chunked = true; |
| } else { |
| offset = 0; |
| if (contentLengthHeader != null) { |
| fileLength = Long.parseLong(contentLengthHeader); |
| chunkLength = fileLength; |
| } else { |
| fileLength = -1; |
| chunkLength = -1; |
| } |
| chunked = false; |
| |
| } |
| chunkResourceName = "chunk_"+offset+"-"+(offset+chunkLength); |
| completed = ((offset+chunkLength) == fileLength) || formFields.containsKey(part.getName()+"@Completed"); |
| LOGGER.debug(" chunkResourceName {}, chunked {},completed {}, fileLength {}, chunkLength {}, offset {} ", |
| new Object[]{chunkResourceName, chunked, completed, fileLength, chunkLength, offset}); |
| } |
| |
| /** |
| * Store the chunk in a file resource under a jcr:content sub node. The method does not commit the resource resolver. The caller |
| * must perform the commit. If the stream is a stream of body parts and the parts are complete, the store operation will commit |
| * the body part but leave the consolitation of all parts to be committed by the caller. ie, always call resourceResolver.commit() after |
| * calling this method. |
| * @param fileResource the file request. |
| * @param changes changes that were made. |
| * @return the jcr:content sub node. |
| * @throws PersistenceException in case of persistence issues |
| */ |
| public Resource store(Resource fileResource, List<Modification> changes) throws PersistenceException { |
| Resource result = fileResource.getChild(JcrConstants.JCR_CONTENT); |
| if (result != null) { |
| updateState(result, changes); |
| } else { |
| result = initState(fileResource, changes); |
| } |
| storeChunk(result, changes); |
| return result; |
| } |
| |
| |
| /** |
| * The last element of strings. |
| * @param strings a non null non zero string array. |
| * @return the last element. |
| */ |
| private String lastFrom(List<String> strings) { |
| return strings.get(strings.size()-1); |
| } |
| |
| /** |
| * Update the state of the content resource to reflect a new body part being streamd. |
| * @param contentResource the content resource |
| * @param changes changes made. |
| * @throws IllegalStateException if the contentResource is not consistent with the part being streamed. |
| * @throws PersistenceException if the part cant be streamed. |
| */ |
| private void updateState(Resource contentResource, List<Modification> changes) throws IllegalStateException, PersistenceException { |
| final ModifiableValueMap vm = contentResource.adaptTo(ModifiableValueMap.class); |
| if ( vm == null ) { |
| throw new PersistenceException("Resource at " + contentResource.getPath() + " is not modifiable."); |
| } |
| vm.put(JcrConstants.JCR_LASTMODIFIED, Calendar.getInstance()); |
| vm.put(JcrConstants.JCR_MIMETYPE, getContentType(part)); |
| if (chunked) { |
| if ( vm.containsKey(SLING_FILE_LENGTH)) { |
| long previousFileLength = vm.get(SLING_FILE_LENGTH, Long.class); |
| if (previousFileLength != fileLength) { |
| throw new IllegalStateException("Chunk file length has changed while cunks were being uploaded expected " + previousFileLength + " chunk contained " + fileLength); |
| } |
| } |
| long previousChunksLength = 0; |
| if ( vm.containsKey(SLING_CHUNKS_LENGTH)) { |
| previousChunksLength = vm.get(SLING_CHUNKS_LENGTH, Long.class); |
| if (previousChunksLength != offset) { |
| throw new IllegalStateException("Chunks recieved out of order, was expecting chunk starting at " + offset + " found last chunk ending at " + previousChunksLength); |
| } |
| } |
| vm.put(SLING_CHUNKS_LENGTH, previousChunksLength + chunkLength); |
| vm.put(JcrConstants.JCR_MIXINTYPES, SLING_CHUNK_MIXIN); |
| } else { |
| try { |
| vm.put(JcrConstants.JCR_DATA, part.getInputStream()); |
| } catch (IOException e) { |
| throw new PersistenceException("Error while retrieving inputstream from request part.", e); |
| } |
| } |
| } |
| |
| /** |
| * Initialise the state of the jcr:content sub resource. |
| * @param fileResource the fileResource parent resource. |
| * @param changes changes that were made. |
| * @return the content resource. |
| * @throws PersistenceException |
| */ |
| private Resource initState(Resource fileResource, List<Modification> changes) throws PersistenceException { |
| Map<String, Object> resourceProps = new HashMap<>(); |
| resourceProps.put(JcrConstants.JCR_PRIMARYTYPE, JcrConstants.NT_RESOURCE); |
| resourceProps.put(JcrConstants.JCR_LASTMODIFIED, Calendar.getInstance()); |
| resourceProps.put(JcrConstants.JCR_MIMETYPE, getContentType(part)); |
| |
| if (chunked) { |
| resourceProps.put(SLING_CHUNKS_LENGTH, chunkLength); |
| resourceProps.put(SLING_FILE_LENGTH, fileLength); |
| resourceProps.put(JcrConstants.JCR_MIXINTYPES, SLING_CHUNK_MIXIN); |
| // add a zero size file to satisfy JCR constraints. |
| resourceProps.put(JcrConstants.JCR_DATA, new ByteArrayInputStream(new byte[0])); |
| } else { |
| try { |
| resourceProps.put(JcrConstants.JCR_DATA, part.getInputStream()); |
| } catch (IOException e) { |
| throw new PersistenceException("Error while retrieving inputstream from request part.", e); |
| } |
| } |
| |
| Resource result = fileResource.getResourceResolver().create(fileResource, JcrConstants.JCR_CONTENT, resourceProps); |
| for( String key : resourceProps.keySet()) { |
| changes.add(Modification.onModified(result.getPath() + '/' + key)); |
| } |
| return result; |
| } |
| |
| /** |
| * Store the chunk in a chunked resource. If not chunked does nothing. |
| * @param contentResource |
| * @param changes |
| * @throws PersistenceException |
| */ |
| private void storeChunk(Resource contentResource, List<Modification> changes) throws PersistenceException { |
| if (chunked) { |
| Map<String, Object> chunkProperties = new HashMap<>(); |
| chunkProperties.put(JcrConstants.JCR_PRIMARYTYPE, SLING_CHUNK_NT); |
| chunkProperties.put(SLING_OFFSET, offset); |
| try { |
| chunkProperties.put(JcrConstants.JCR_DATA, part.getInputStream()); |
| } catch (IOException e) { |
| throw new PersistenceException("Error while retrieving inputstream from request part.", e); |
| } |
| LOGGER.debug("Creating chunk at {} with properties {} ", chunkResourceName, chunkProperties); |
| Resource chunkResource = contentResource.getResourceResolver().create(contentResource, chunkResourceName, chunkProperties); |
| |
| |
| for (String key : chunkProperties.keySet()) { |
| changes.add(Modification.onModified(chunkResource.getPath() + '/' + key)); |
| } |
| |
| |
| processChunks(contentResource, changes); |
| } |
| |
| } |
| |
| /** |
| * process all chunks formed so far to create the final body. |
| * @param contentResource |
| * @param changes |
| * @throws PersistenceException |
| */ |
| private void processChunks(Resource contentResource, List<Modification> changes) throws PersistenceException { |
| if (completed) { |
| |
| // have to commit before processing chunks. |
| contentResource.getResourceResolver().commit(); |
| ModifiableValueMap vm = contentResource.adaptTo(ModifiableValueMap.class); |
| vm.put("jcr:data", getChunksInputStream(contentResource)); |
| // might have to commit before removing chunk data, depending on if the InputStream still works. |
| removeChunkData(contentResource, vm); |
| } |
| } |
| |
| /** |
| * remove chunk data. |
| * @param contentResource |
| * @param vm |
| * @throws PersistenceException |
| */ |
| private void removeChunkData(Resource contentResource, ModifiableValueMap vm) throws PersistenceException { |
| for ( Resource r : contentResource.getChildren()) { |
| if (r.isResourceType(SLING_CHUNK_NT)) { |
| r.getResourceResolver().delete(r); |
| } |
| } |
| vm.remove(SLING_CHUNKS_LENGTH); |
| vm.remove(SLING_FILE_LENGTH); |
| } |
| |
| /** |
| * Create an input stream that will read though the chunks in order. |
| * @param contentResource |
| * @return |
| */ |
| private InputStream getChunksInputStream(Resource contentResource) { |
| List<Resource> chunkResources = new ArrayList<>(); |
| for ( Resource r : contentResource.getChildren()) { |
| if (r.isResourceType(SLING_CHUNK_NT)) { |
| chunkResources.add(r); |
| } |
| } |
| Collections.sort(chunkResources, new Comparator<Resource>() { |
| @Override |
| public int compare(Resource o1, Resource o2) { |
| long offset1 = o1.adaptTo(ValueMap.class).get(SLING_OFFSET, Long.class); |
| long offset2 = o2.adaptTo(ValueMap.class).get(SLING_OFFSET, Long.class); |
| return (int) (offset1 - offset2); |
| } |
| }); |
| if ( LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Finishing Chunk upload at {} consolidating {} chunks into one file of ", |
| new Object[]{ |
| contentResource.getPath(), |
| chunkResources.size(), |
| contentResource.adaptTo(ValueMap.class).get(SLING_CHUNKS_LENGTH) |
| }); |
| LOGGER.debug("Content Resource Properties {} ", contentResource.adaptTo(ValueMap.class)); |
| for (Resource r : chunkResources) { |
| LOGGER.debug("Chunk {} properties {} ", r.getPath(), r.adaptTo(ValueMap.class)); |
| } |
| } |
| return new ResourceIteratorInputStream(chunkResources.iterator()); |
| } |
| |
| /** |
| * Get the content type of the part. |
| * @param part |
| * @return |
| */ |
| private String getContentType(final Part part) { |
| String contentType = part.getContentType(); |
| if (contentType != null) { |
| int idx = contentType.indexOf(';'); |
| if (idx > 0) { |
| contentType = contentType.substring(0, idx); |
| } |
| } |
| if (contentType == null || contentType.equals(MT_APP_OCTET)) { |
| // try to find a better content type |
| ServletContext ctx = this.servletContext; |
| if (ctx != null) { |
| contentType = ctx.getMimeType(part.getSubmittedFileName()); |
| } |
| if (contentType == null || contentType.equals(MT_APP_OCTET)) { |
| contentType = MT_APP_OCTET; |
| } |
| } |
| return contentType; |
| } |
| |
| /** |
| * Parses Content-Range headers according to spec https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html section 14.16 |
| * |
| * Content-Range = "Content-Range" ":" content-range-spec |
| * content-range-spec = byte-content-range-spec |
| * byte-content-range-spec = bytes-unit SP |
| * byte-range-resp-spec "/" |
| * ( instance-length | "*" ) |
| * byte-range-resp-spec = (first-byte-pos "-" last-byte-pos) |
| * | "*" |
| * instance-length = 1*DIGIT |
| * |
| * eg |
| * bytes 0-1233/1234 |
| * bytes 500-1233/1234 |
| * bytes 500-1233/* |
| * |
| * According to https://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.12 "bytes" is the only valid range unit. |
| */ |
| public static class ContentRange { |
| private static final Pattern rangePattern = Pattern.compile("bytes\\s([0-9]+)-([0-9]+)\\/([0-9]*)(\\**)"); |
| public long length; |
| public long offset; |
| public long range; |
| |
| |
| public ContentRange(String contentRangeHeader) { |
| Matcher m = rangePattern.matcher(contentRangeHeader); |
| if ( m.find() ) { |
| offset = Long.parseLong(m.group(1)); |
| long end = Long.parseLong(m.group(2)); |
| range = end-offset+1; |
| if ("*".equals(m.group(4))) { |
| length = -1; |
| } else { |
| length = Long.parseLong(m.group(3)); |
| if ( offset > length ) { |
| throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, offset beyond end."); |
| } |
| if ( end > length ) { |
| throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, range end beyond end."); |
| } |
| if ( range > length ) { |
| throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, range greater than length."); |
| } |
| } |
| if ( offset > end ) { |
| throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, offset beyond end of range."); |
| } |
| } else { |
| throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid"); |
| } |
| } |
| } |
| } |