SLING-6027 Supporting Sling Chunked Upload over streamed uploads. Adding Content-Range range uploads as well to standardise the protocol

git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1759789 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/Protocols.md b/Protocols.md
new file mode 100644
index 0000000..01a91a9
--- /dev/null
+++ b/Protocols.md
@@ -0,0 +1,113 @@
+# Protocols
+
+Many of these protocols are documented elsewhere, however this document is with the source code and so hopefully uptodate.
+
+There is also published documentation at the following locations which may lag the code and this document. This document wont
+reproduce the information in the following locations, but will note differences when they are discovered.
+
+https://sling.apache.org/documentation/bundles/manipulating-content-the-slingpostservlet-servlets-post.html
+https://cwiki.apache.org/confluence/display/SLING/Chunked+File+Upload+Support
+
+# Request Processing.
+
+In general a request is processed by the Sling Engine encapsulating the data in Sling API interfaces. See the above locations
+for more information on how that works.
+
+# Uploads
+
+The default Sling Post servlets support both streamed and non streamed uploads. Non streamed uploads are processed by the
+Sling Engine request processing mechanisms before the information is passed to the the Servlet. While this is more flexible
+as the client developer does not have to think about the order of the parameters, it is less efficient as file bodies must
+be read completely before processing. This leads to more IO. Streaming uploads on the other hand read the request stream
+as it is send from the client and process it as it is read. This avoids some of the additional IO, but requires more attention
+to detain by a client developer as the parts of the post must arrive in teh correct order. Streaming is the preference method
+of upload where large files are involved.
+
+# Non streamed uploads.
+
+## Whole body uploads.
+
+See https://sling.apache.org/documentation/bundles/manipulating-content-the-slingpostservlet-servlets-post.html
+no known variance from the published documentation.
+
+## Sling Chunked body uploads.
+
+Note "Chunked" in this context is a special Sling protocol represented in request parameters intended to allow the client
+to upload chunks of a file to Sling. The protocol protects agains multiple clients perfoming an upload on the same resource, 
+but does not support multiple clients performing chunked uploads at the same time. Multiple requests are used, multiple chunks cannont
+generally be sent in a single request.
+See https://cwiki.apache.org/confluence/display/SLING/Chunked+File+Upload+Support.
+
+Chunked uploads are supported by writing the data to special sub nodes of the nt:resource node and when the chunks are complete the subnodes are
+read back in sequence creating the final binary. If chunks are send in the wrong order, the chunk is rejected.
+
+The definition of the chunknode differs from the published documentation.
+
+        // node type to store chunk
+        // offset: offset of chunk in file
+        // jcr:data: binary of chunk
+        [sling:chunk] > nt:hierarchyNode
+          primaryitem jcr:data
+          - sling:offset  (long) mandatory
+          - jcr:data (binary) mandatory
+          
+         //-----------------------------------------------------------------------------
+         // Mixin node type to identify that a node has chunks
+         // sling:fileLength : length of complete file
+         // sling:length : cumulative length of all uploaded chunks
+        [sling:chunks]
+          mixin
+          - sling:fileLength (long)
+          - sling:length (long)
+          + * (sling:chunk) multiple
+
+
+
+# Streamed uploads
+
+Streamed uploads were implemented under the following issues.
+
+https://issues.apache.org/jira/browse/SLING-5948
+https://issues.apache.org/jira/browse/SLING-6017
+https://issues.apache.org/jira/browse/SLING-6027
+
+For streaming to work the Sling Engine must not read the parts of the request. This is achieved by appending a request parameter 
+to the URL "uploadmode=stream" or adding a Header "Sling-uploadmode: stream". When that is done the Sling Engine does not 
+read the request, but provides a Iterator<Parts> in a request attribute (request-parts-iterator) that will provide each part as it
+is sent from the client. In this mode the SlingDefaultPost Servlet invokes the StreamedUploadOperation which iterates through the Parts.
+
+The StreamedUploadOperation does not support the standard Sling Post protocol. Any node that need to be created with non standard properties 
+or structure should be created in a seperate POST. If the node PrimaryType needs to be non standard, that POST operation should be before
+any upload is performed.
+
+## Full body streamed uploads.
+
+Full body streamed uploads are performed by a normal file upload. The name of the part is used as the File name. The location of the POST is the parent 
+resource which should already exist. If the name of the part is "*" then the supplied name of the file upload is used as the name of the resource. This is 
+the same as non streamed full body uploads. The behaviour is specific to Sling and may not be the same as adopeted by other systems.
+
+## Chunked streamed uploads.
+
+Chunked streamed upload are also supported with some subtle variations. Since the length of a body part cant be known until it has been fully streamed, a streaming
+chunked upload must specify the length of the body part prior to sending the body part. This can be achieved by in one of 3 ways. Sending a <name>@PartLength form field prior
+to the part to indicate the length of the Part. Setting a Content-Length header on the part itself, or using a Content-Range header on the part itself. If all of these are missing,
+processing assumes that the body part is the last body part in the file and truncates the file to that length, processing all parts so far.
+Unlike the non streamed chunked upload protocol, the streamed upload protocol can accept multiple body parts in 1 request. Each body part
+must be immediately preceded by the correct request parameters. (ie new @Offset if using Form fields).
+
+### Via request parameters.
+
+The standard Sling Chunked Upload protocol uses request parameters of the form <name>@Length indicating the final length
+of the file and <name>@Offset indicating the offset of the next body part and <name>@PartLength to indicate the 
+length of the part. <name>@PartLength is in addition to the nonstreamed protocol. <name> is the name of the body part 
+that follows. If any of these parameters are present in the request when a part is encountered, chunked processing is performed. 
+The chunk processing will detect when all the chunks have been send and perform final processing.
+
+### Via Content-Range headers
+
+Content Range headers are a part of the Http 1.1 Standard documented at https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html section 14.16. 
+If Content Range headers are provided on each Body part they are used in prefence to the propriatary Sling Chunk Upload protocol. This approach 
+is used by other resuable and chunked upload apis notably the Google Drive API. If content range headers are used, each part is self contained
+and no other request parameters or headers are are required. There is an expectation that a content range header will use the form
+"Content-Range: bytes 1234-2345/6789" specifying the full lenght and not the form  "Content-Range: bytes 1234-2345/*" which implies a length of 2345.
+
diff --git a/README.txt b/README.txt
index ca88f5b..594cc5d 100644
--- a/README.txt
+++ b/README.txt
@@ -25,3 +25,6 @@
 
 See the Subversion documentation for other source control features.
 
+
+
+
diff --git a/developer-tests/README.md b/developer-tests/README.md
new file mode 100644
index 0000000..bd318a5
--- /dev/null
+++ b/developer-tests/README.md
@@ -0,0 +1,12 @@
+# Developer test scripts.
+
+Testing POST behaviour can involve a lot of curl commands or browser work, expecially with file uploads. 
+This folder contains test scripts that verify the behaviour of this bundle in a running Sling instance. 
+They are not intended as a replacement for unit tests or integration tests, but rather a quick way to allow
+a developer to verify behaviour and be in control of the test being run.
+
+## testFileUploads.sh
+run in parent as sh developer-tests/testFileUploads.sh <testfile> and it will upload that file using non streamed, 
+ streamed, and streaming chunked protocols to the Sling server on localhost:8080 as admin:admin. It will also download 
+ the file and diff it against the local copy to ensure no changes. Chunks are 20kb each, so a large file will generate a
+ large number of small chunks. Dont try and test a GB file without editing the script to increase the chunk size.
\ No newline at end of file
diff --git a/developer-tests/testFileUploads.sh b/developer-tests/testFileUploads.sh
new file mode 100644
index 0000000..0179a5e
--- /dev/null
+++ b/developer-tests/testFileUploads.sh
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+mkdir -p target
+cd target
+echo Creating file via normal non streamed upload
+cp $1 P1060839.jpg
+curl -v -F P1060839.jpg=@P1060839.jpg http://admin:admin@localhost:8080/content/test
+curl http://admin:admin@localhost:8080/content/test/P1060839.jpg > P1060839_up.jpg 
+echo Checking normal upload.
+diff P1060839.jpg P1060839_up.jpg 
+
+echo Creating file via normal non streamed upload
+cp P1060839.jpg  SP1060839.jpg  
+curl -v -H "Sling-uploadmode: stream" -F key1=value1 -F *=@SP1060839.jpg -F PSP1060839.jpg=@SP1060839.jpg -F key2=admin2 http://admin:admin@localhost:8080/content/test
+curl http://admin:admin@localhost:8080/content/test/PSP1060839.jpg > PSP1060839_up.jpg 
+curl http://admin:admin@localhost:8080/content/test/SP1060839.jpg > SP1060839_up.jpg 
+diff P1060839.jpg PSP1060839_up.jpg
+diff P1060839.jpg SP1060839_up.jpg
+
+echo Checking chunked upload in 50k blocks.
+rm -f P1060839_chunk*
+split -b 20k P1060839.jpg  P1060839_chunk
+offset=0
+length=`wc -c P1060839.jpg | sed "s/ *\([0-9]*\) .*/\1/"`
+for i in P1060839_chunk*; do
+   size=`wc -c $i  | sed "s/ *\([0-9]*\) .*/\1/"`
+   curl -v -H "Sling-uploadmode: stream" -F CP1060839.jpg@Length=$length -F CP1060839.jpg@PartLength=$size -F CP1060839.jpg@Offset=$offset -F CP1060839.jpg=@$i http://admin:admin@localhost:8080/content/test
+   let offset=offset+size
+done
+curl http://admin:admin@localhost:8080/content/test/CP1060839.jpg > CP1060839_up.jpg
+diff -u P1060839.jpg CP1060839_up.jpg
diff --git a/pom.xml b/pom.xml
index ec97e97..600b6a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@
                 <excludes>
                   <exclude>src/main/resources/org/apache/sling/servlets/post/HtmlResponse.html</exclude>
                   <exclude>src/main/resources/org/apache/sling/servlets/post/HtmlNoGoBackResponse.html</exclude>
+                  <exclude>developer-tests/README.md</exclude>
+                  <exclude>Protocols.md</exclude>
                 </excludes>
               </configuration>
             </plugin>
diff --git a/src/main/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStream.java b/src/main/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStream.java
new file mode 100644
index 0000000..3e6603e
--- /dev/null
+++ b/src/main/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStream.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sling.servlets.post.impl.helper;
+
+import org.apache.sling.api.resource.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+/**
+ * An input stream that reads from a list of resources that can be adapted into input streams.
+ */
+public class ResourceIteratorInputStream extends InputStream {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ResourceIteratorInputStream.class);
+    private int n;
+    private InputStream currentStream;
+    private final Iterator<Resource> iterator;
+    private int streamNo = 0;
+
+    public ResourceIteratorInputStream(Iterator<Resource> iterator) {
+        this.iterator = iterator;
+        while(iterator.hasNext()) {
+            currentStream = iterator.next().adaptTo(InputStream.class);
+            if ( currentStream != null) {
+                n = 0;
+                streamNo = 1;
+                return;
+            }
+        }
+        throw new IllegalArgumentException("Resource iterator does not contain any resources that can be adapted to an input stream.");
+    }
+
+    @Override
+    public int read() throws IOException {
+        int i = currentStream.read();
+        while ( i == -1 ) {
+            if ( iterator.hasNext()) {
+                LOGGER.debug("Stream {} provided {} bytes. ",streamNo, n);
+                currentStream = iterator.next().adaptTo(InputStream.class);
+                streamNo++;
+                n = 0;
+                if (currentStream != null) {
+                    i = currentStream.read();
+                }
+            } else {
+                LOGGER.debug("Last Stream {} provided {} bytes. ",streamNo, n);
+                return -1;
+            }
+        }
+        n++;
+        return i;
+    }
+}
diff --git a/src/main/java/org/apache/sling/servlets/post/impl/helper/StreamedChunk.java b/src/main/java/org/apache/sling/servlets/post/impl/helper/StreamedChunk.java
new file mode 100644
index 0000000..757c363
--- /dev/null
+++ b/src/main/java/org/apache/sling/servlets/post/impl/helper/StreamedChunk.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sling.servlets.post.impl.helper;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.servlets.post.Modification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.Part;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Supports streamed uploads including where the stream is made up of partial body parts.
+ * The behaviour is documented here https://cwiki.apache.org/confluence/display/SLING/Chunked+File+Upload+Support, adding the ability
+ * to define a body part with a Content-Range header on the part. Since the body parts are streamed there are some restrictions. If the
+ * length of a body part is missing from either the Content-Range header or a Content-Length header in the Part, then the length of the part is
+ * assumed to be the rest of the body to make the total length of the upload that specified in earlier Content-Range headers or a @Length property.
+ *
+ * When using only Content-Range headers (see the HTTP 1.1 spec) the Content-Range header must be complete and applied to the Part of the body.
+ * The length of the full file must be specified and be the same on all body parts, and the body parts must be sent in order. This is a restriction
+ * of the Sling Chunked File Upload protocol. When the total uploaded equals the file length the chunked uploads are processed to generate the final upload.
+ *
+ * When using request parameters, the most recent request parameters are used for @Completed, @Offset and @Length. When using request parameters if the
+ * Content-Length header is missing from the body Part, then the Body part is assumed to be the final body part. Then the total uploaded equals the value of
+ * the @Length parameter or a @Completed parameter is present, then the body parts are joined into a single body part.
+ *
+ * Consolidating body parts will cause all body parts to be read from the DS, which will incure 3x the IO of a non body part or chunked upload. For FS DS the IO may be from
+ * OS level disk cache. For other styles of DS the IO may consume more resources. Chunked or Body part uploads are not as efficient as whole body uploads and should
+ * be avoided wherever possible. This could be avoided if Oak would expose a seekable OutputStream, or allow writes to Binaries to specify and offset.
+ *
+ *
+ *
+ */
+public class StreamedChunk {
+    private static final String NT_RESOURCE = "nt:resource";
+    private static final String JCR_LASTMODIFIED = "jcr:lastModified";
+    private static final String JCR_MIMETYPE = "jcr:mimeType";
+    private static final String JCR_DATA = "jcr:data";
+    private static final String JCR_CONTENT = "jcr:content";
+    private static final String JCR_PRIMARY_TYPE = "jcr:primaryType";
+    private static final String SLING_CHUNKS_LENGTH = "sling:length";
+    private static final String SLING_FILE_LENGTH = "sling:fileLength";
+    private static final String JCR_MIXIN_TYPES = "jcr:mixinTypes";
+    private static final String SLING_CHUNK_MIXIN = "sling:chunks";
+    private static final String SLING_CHUNK_NT = "sling:chunk";
+    private static final String SLING_OFFSET = "sling:offset";
+    private static final String MT_APP_OCTET = "application/octet-stream";
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamedChunk.class);
+
+    private final long offset;
+    private final long chunkLength;
+    private final long fileLength;
+    private final Part part;
+    private final Map<String, List<String>> formFields;
+    private ServletContext servletContext;
+    private final boolean completed;
+    private final boolean chunked;
+    private final String chunkResourceName;
+
+    /**
+     * Construct a chunk from the part and form fields. Once constructed it is immutable exposing a store method to store the chunk.
+     * If the part does not represent a chunk then the class behaves as if the chunk is a upload of 1 chunk (ie the whole file).
+     * @param part the current part, not read other than headers.
+     * @param formFields form fields encountered in teh request stream prior to this part.
+     * @param servletContext the current servlet context needed to resolve mimetypes.
+     */
+    public StreamedChunk(Part part, Map<String, List<String>> formFields, ServletContext servletContext) {
+        this.part = part;
+        this.formFields = formFields;
+        this.servletContext = servletContext;
+
+        String contentRangeHeader = part.getHeader("Content-Range");
+        String contentLengthHeader = part.getHeader("Content-Length");
+        if ( contentRangeHeader != null ) {
+            ContentRange contentRange = new ContentRange(contentRangeHeader);
+            fileLength = contentRange.length;
+            offset = contentRange.offset;
+            chunkLength = contentRange.range;
+            chunked = true;
+
+        } else if ( formFields.containsKey(part.getName()+"@Length") && formFields.containsKey(part.getName()+"@Offset")) {
+            fileLength = Long.parseLong(lastFrom(formFields.get(part.getName() + "@Length")));
+            offset = Long.parseLong(lastFrom(formFields.get(part.getName() + "@Offset")));
+            if (contentLengthHeader != null) {
+                chunkLength = Long.parseLong(contentLengthHeader);
+            } else if ( formFields.containsKey(part.getName() + "@PartLength")) {
+                chunkLength = Long.parseLong(lastFrom(formFields.get(part.getName() + "@PartLength")));
+            } else {
+                // must assume the chunk contains all the data.
+                LOGGER.info("No part length specified assuming this is the final part of the chunked upload.");
+                chunkLength = fileLength - offset;
+            }
+            chunked = true;
+        } else {
+            offset = 0;
+            if (contentLengthHeader != null) {
+                fileLength =  Long.parseLong(contentLengthHeader);
+                chunkLength = fileLength;
+            } else {
+                fileLength = -1;
+                chunkLength = -1;
+            }
+            chunked = false;
+
+        }
+        chunkResourceName = "chunk_"+offset+"-"+(offset+chunkLength);
+        completed = ((offset+chunkLength) == fileLength) || formFields.containsKey(part.getName()+"@Completed");
+        LOGGER.debug(" chunkResourceName {},  chunked {},completed {},  fileLength {}, chunkLength {}, offset {} ",
+                new Object[]{chunkResourceName, chunked, completed, fileLength, chunkLength, offset});
+    }
+
+    /**
+     * Store the chunk in a file resource under a jcr:content sub node. The method does not commit the resource resolver. The caller
+     * must perform the commit. If the stream is a stream of body parts and the parts are complete, the store operation will commit
+     * the body part but leave the consolitation of all parts to be committed by the caller. ie, always call resourceResolver.commit() after
+     * calling this method.
+     * @param fileResource the file request.
+     * @param changes changes that were made.
+     * @return the jcr:content sub node.
+     * @throws PersistenceException
+     */
+    public Resource store(Resource fileResource, List<Modification> changes) throws PersistenceException {
+        Resource result = fileResource.getChild(JCR_CONTENT);
+        if (result != null) {
+            updateState(result, changes);
+        } else {
+            result = initState(fileResource, changes);
+        }
+        storeChunk(result, changes);
+        return result;
+    }
+
+
+    /**
+     * The last element of strings.
+     * @param strings a non null non zero string array.
+     * @return the last element.
+     */
+    private String lastFrom(List<String> strings) {
+        return strings.get(strings.size()-1);
+    }
+
+    /**
+     * Update the state of the content resource to reflect a new body part being streamd.
+     * @param contentResource the content resource
+     * @param changes changes made.
+     * @throws IllegalStateException if the contentResource is not consistent with the part being streamed.
+     * @throws PersistenceException if the part cant be streamed.
+     */
+    private void updateState(Resource contentResource, List<Modification> changes) throws IllegalStateException, PersistenceException {
+        final ModifiableValueMap vm = contentResource.adaptTo(ModifiableValueMap.class);
+        if ( vm == null ) {
+            throw new PersistenceException("Resource at " + contentResource.getPath() + " is not modifiable.");
+        }
+        vm.put(JCR_LASTMODIFIED, Calendar.getInstance());
+        vm.put(JCR_MIMETYPE, getContentType(part));
+        if (chunked) {
+            if ( vm.containsKey(SLING_FILE_LENGTH)) {
+                long previousFileLength = (Long) vm.get(SLING_FILE_LENGTH, Long.class);
+                if (previousFileLength != fileLength) {
+                    throw new IllegalStateException("Chunk file length has changed while cunks were being uploaded expected " + previousFileLength + " chunk contained  " + fileLength);
+                }
+            }
+            long previousChunksLength = 0;
+            if ( vm.containsKey(SLING_CHUNKS_LENGTH)) {
+                previousChunksLength = (Long) vm.get(SLING_CHUNKS_LENGTH, Long.class);
+                if (previousChunksLength != offset) {
+                    throw new IllegalStateException("Chunks recieved out of order, was expecting chunk starting at " + offset + " found last chunk ending at " + previousChunksLength);
+                }
+            }
+            vm.put(SLING_CHUNKS_LENGTH, previousChunksLength + chunkLength);
+            vm.put(JCR_MIXIN_TYPES, SLING_CHUNK_MIXIN);
+        } else {
+            try {
+                vm.put(JCR_DATA, part.getInputStream());
+            } catch (IOException e) {
+                throw new PersistenceException("Error while retrieving inputstream from request part.", e);
+            }
+        }
+    }
+
+    /**
+     * Initialise the state of the jcr:content sub resource.
+     * @param fileResource the fileResource parent resource.
+     * @param changes changes that were made.
+     * @return the content resource.
+     * @throws PersistenceException
+     */
+    private Resource initState(Resource fileResource, List<Modification> changes) throws PersistenceException {
+        Map<String, Object> resourceProps = new HashMap<String, Object>();
+        resourceProps.put(JCR_PRIMARY_TYPE, NT_RESOURCE);
+        resourceProps.put(JCR_LASTMODIFIED, Calendar.getInstance());
+        resourceProps.put(JCR_MIMETYPE, getContentType(part));
+
+        if (chunked) {
+            resourceProps.put(SLING_CHUNKS_LENGTH, chunkLength);
+            resourceProps.put(SLING_FILE_LENGTH, fileLength);
+            resourceProps.put(JCR_MIXIN_TYPES, SLING_CHUNK_MIXIN);
+            // add a zero size file to satisfy JCR constraints.
+            resourceProps.put(JCR_DATA, new ByteArrayInputStream(new byte[0]));
+        } else {
+            try {
+                resourceProps.put(JCR_DATA, part.getInputStream());
+            } catch (IOException e) {
+                throw new PersistenceException("Error while retrieving inputstream from request part.", e);
+            }
+        }
+
+        Resource result = fileResource.getResourceResolver().create(fileResource, JCR_CONTENT, resourceProps);
+        for( String key : resourceProps.keySet()) {
+            changes.add(Modification.onModified(result.getPath() + '/' + key));
+        }
+        return result;
+    }
+
+    /**
+     * Store the chunk in a chunked resource. If not chunked does nothing.
+     * @param contentResource
+     * @param changes
+     * @throws PersistenceException
+     */
+    private void storeChunk(Resource contentResource, List<Modification> changes) throws PersistenceException {
+        if (chunked) {
+            Map<String, Object> chunkProperties = new HashMap<String, Object>();
+            chunkProperties.put(JCR_PRIMARY_TYPE, SLING_CHUNK_NT);
+            chunkProperties.put(SLING_OFFSET, offset);
+            try {
+                chunkProperties.put(JCR_DATA, part.getInputStream());
+            } catch (IOException e) {
+                throw new PersistenceException("Error while retrieving inputstream from request part.", e);
+            }
+            LOGGER.debug("Creating chunk at {} with properties {}  ", chunkResourceName, chunkProperties);
+            Resource chunkResource = contentResource.getResourceResolver().create(contentResource, chunkResourceName, chunkProperties);
+
+
+            for (String key : chunkProperties.keySet()) {
+                changes.add(Modification.onModified(chunkResource.getPath() + '/' + key));
+            }
+
+
+            processChunks(contentResource, changes);
+        }
+
+    }
+
+    /**
+     * process all chunks formed so far to create the final body.
+     * @param contentResource
+     * @param changes
+     * @throws PersistenceException
+     */
+    private void processChunks(Resource contentResource, List<Modification> changes) throws PersistenceException {
+        if (completed) {
+
+            // have to commit before processing chunks.
+            contentResource.getResourceResolver().commit();
+            ModifiableValueMap vm = contentResource.adaptTo(ModifiableValueMap.class);
+            vm.put("jcr:data", getChunksInputStream(contentResource));
+            // might have to commit before removing chunk data, depending on if the InputStream still works.
+            removeChunkData(contentResource, vm);
+        }
+    }
+
+    /**
+     * remove chunk data.
+     * @param contentResource
+     * @param vm
+     * @throws PersistenceException
+     */
+    private void removeChunkData(Resource contentResource, ModifiableValueMap vm) throws PersistenceException {
+        for ( Resource r : contentResource.getChildren()) {
+            if (r.isResourceType(SLING_CHUNK_NT)) {
+                r.getResourceResolver().delete(r);
+            }
+        }
+        vm.remove(SLING_CHUNKS_LENGTH);
+        vm.remove(SLING_FILE_LENGTH);
+    }
+
+    /**
+     * Create an input stream that will read though the chunks in order.
+     * @param contentResource
+     * @return
+     */
+    private InputStream getChunksInputStream(Resource contentResource) {
+        List<Resource> chunkResources = new ArrayList<Resource>();
+        for ( Resource r : contentResource.getChildren()) {
+            if (r.isResourceType(SLING_CHUNK_NT)) {
+                chunkResources.add(r);
+            }
+        }
+        Collections.sort(chunkResources, new Comparator<Resource>() {
+            @Override
+            public int compare(Resource o1, Resource o2) {
+                long offset1 = o1.adaptTo(ValueMap.class).get(SLING_OFFSET, Long.class);
+                long offset2 = o2.adaptTo(ValueMap.class).get(SLING_OFFSET, Long.class);
+                return (int) (offset1 - offset2);
+            }
+        });
+        if ( LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Finishing Chunk upload at {} consolidating {} chunks into one file of  ",
+                    new Object[]{
+                            contentResource.getPath(),
+                            chunkResources.size(),
+                            contentResource.adaptTo(ValueMap.class).get(SLING_CHUNKS_LENGTH)
+                    });
+            LOGGER.debug("Content Resource Properties {} ", contentResource.adaptTo(ValueMap.class));
+            for (Resource r : chunkResources) {
+                LOGGER.debug("Chunk {} properties {} ", r.getPath(), r.adaptTo(ValueMap.class));
+            }
+        }
+        return new ResourceIteratorInputStream(chunkResources.iterator());
+    }
+
+    /**
+     * Get the content type of the part.
+     * @param part
+     * @return
+     */
+    private String getContentType(final Part part) {
+        String contentType = part.getContentType();
+        if (contentType != null) {
+            int idx = contentType.indexOf(';');
+            if (idx > 0) {
+                contentType = contentType.substring(0, idx);
+            }
+        }
+        if (contentType == null || contentType.equals(MT_APP_OCTET)) {
+            // try to find a better content type
+            ServletContext ctx = this.servletContext;
+            if (ctx != null) {
+                contentType = ctx.getMimeType(part.getSubmittedFileName());
+            }
+            if (contentType == null || contentType.equals(MT_APP_OCTET)) {
+                contentType = MT_APP_OCTET;
+            }
+        }
+        return contentType;
+    }
+
+    /**
+     * Parses Content-Range headers according to spec https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html section 14.16
+     *
+     *     Content-Range = "Content-Range" ":" content-range-spec
+     * content-range-spec      = byte-content-range-spec
+     * byte-content-range-spec = bytes-unit SP
+     *                           byte-range-resp-spec "/"
+     *                           ( instance-length | "*" )
+     * byte-range-resp-spec = (first-byte-pos "-" last-byte-pos)
+     *                          | "*"
+     * instance-length = 1*DIGIT
+     *
+     * eg
+     * bytes 0-1233/1234
+     * bytes 500-1233/1234
+     * bytes 500-1233/*
+     *
+     * According to https://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.12 "bytes" is the only valid range unit.
+     */
+    public static class ContentRange {
+        private static final Pattern rangePattern = Pattern.compile("bytes\\s([0-9]+)-([0-9]+)\\/([0-9]*)(\\**)");
+        public long length;
+        public long offset;
+        public long range;
+
+
+        public ContentRange(String contentRangeHeader) {
+            Matcher m = rangePattern.matcher(contentRangeHeader);
+            if ( m.find() ) {
+                offset = Long.parseLong(m.group(1));
+                long end = Long.parseLong(m.group(2));
+                range = end-offset+1;
+                if ("*".equals(m.group(4))) {
+                    length = -1;
+                } else {
+                    length = Long.parseLong(m.group(3));
+                    if ( offset > length ) {
+                        throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, offset beyond end.");
+                    }
+                    if ( end > length ) {
+                        throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, range end beyond end.");
+                    }
+                    if ( range > length ) {
+                        throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, range greater than length.");
+                    }
+                }
+                if ( offset > end ) {
+                    throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid, offset beyond end of range.");
+                }
+            } else {
+                throw new IllegalArgumentException("Range header "+contentRangeHeader+" is invalid");
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/servlets/post/impl/operations/StreamedUploadOperation.java b/src/main/java/org/apache/sling/servlets/post/impl/operations/StreamedUploadOperation.java
index a227601..067ba20 100644
--- a/src/main/java/org/apache/sling/servlets/post/impl/operations/StreamedUploadOperation.java
+++ b/src/main/java/org/apache/sling/servlets/post/impl/operations/StreamedUploadOperation.java
@@ -20,7 +20,6 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.SlingHttpServletRequest;
-import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -28,6 +27,7 @@
 import org.apache.sling.servlets.post.AbstractPostOperation;
 import org.apache.sling.servlets.post.Modification;
 import org.apache.sling.servlets.post.PostResponse;
+import org.apache.sling.servlets.post.impl.helper.StreamedChunk;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +36,6 @@
 import javax.servlet.http.Part;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -58,12 +57,6 @@
 public class StreamedUploadOperation extends AbstractPostOperation {
     private static final Logger LOG = LoggerFactory.getLogger(StreamedUploadOperation.class);
     public static final String NT_FILE = "nt:file";
-    public static final String NT_RESOURCE = "nt:resource";
-    public static final String JCR_LASTMODIFIED = "jcr:lastModified";
-    public static final String JCR_MIMETYPE = "jcr:mimeType";
-    public static final String JCR_DATA = "jcr:data";
-    private static final String MT_APP_OCTET = "application/octet-stream";
-    private static final String JCR_CONTENT = "jcr:content";
     private ServletContext servletContext;
 
     public void setServletContext(final ServletContext servletContext) {
@@ -167,33 +160,10 @@
         }
 
 
-
-        Map<String, Object> resourceProps = new HashMap<String, Object>();
-        resourceProps.put("jcr:primaryType", NT_RESOURCE);
-        resourceProps.put(JCR_LASTMODIFIED, Calendar.getInstance());
-        // TODO: Should all the formFields be added to the prop map ?
-        resourceProps.put(JCR_MIMETYPE, getContentType(part));
-        try {
-            resourceProps.put(JCR_DATA, part.getInputStream());
-        } catch (IOException e) {
-            throw new PersistenceException("Error while retrieving inputstream from request part.", e);
-        }
-        Resource result = fileResource.getChild(JCR_CONTENT);
-        if ( result != null ) {
-            final ModifiableValueMap vm = result.adaptTo(ModifiableValueMap.class);
-            if ( vm == null ) {
-                throw new PersistenceException("Resource at " + fileResource.getPath() + '/' + JCR_CONTENT + " is not modifiable.");
-            }
-            vm.putAll(resourceProps);
-        } else {
-            result = parentResource.getResourceResolver().create(fileResource, JCR_CONTENT, resourceProps);
-        }
-        // Commit must be called to perform to cause streaming so the next part can be found.
+        StreamedChunk chunk = new StreamedChunk(part, formFields, servletContext);
+        Resource result = chunk.store(fileResource, changes);
         result.getResourceResolver().commit();
 
-        for( String key : resourceProps.keySet()) {
-            changes.add(Modification.onModified(result.getPath() + '/' + key));
-        }
     }
 
     /**
@@ -235,31 +205,6 @@
     }
 
 
-    /**
-     * Get the content type of the part.
-     * @param part
-     * @return
-     */
-    private String getContentType(final Part part) {
-        String contentType = part.getContentType();
-        if (contentType != null) {
-            int idx = contentType.indexOf(';');
-            if (idx > 0) {
-                contentType = contentType.substring(0, idx);
-            }
-        }
-        if (contentType == null || contentType.equals(MT_APP_OCTET)) {
-            // try to find a better content type
-            ServletContext ctx = this.servletContext;
-            if (ctx != null) {
-                contentType = ctx.getMimeType(part.getSubmittedFileName());
-            }
-            if (contentType == null || contentType.equals(MT_APP_OCTET)) {
-                contentType = MT_APP_OCTET;
-            }
-        }
-        return contentType;
-    }
 
 
 
diff --git a/src/test/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStreamTest.java b/src/test/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStreamTest.java
new file mode 100644
index 0000000..e91a020
--- /dev/null
+++ b/src/test/java/org/apache/sling/servlets/post/impl/helper/ResourceIteratorInputStreamTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.servlets.post.impl.helper;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.commons.testing.sling.MockResource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by ieb on 06/09/2016.
+ */
+public class ResourceIteratorInputStreamTest {
+
+    @Test
+    public void test() throws IOException {
+        List<Resource> resources = new ArrayList<Resource>();
+        for (int i = 0; i < 10; i++ ) {
+            final int initialState = i;
+            final InputStream in = new InputStream() {
+                private int state = initialState;
+                @Override
+                public int read() throws IOException {
+                    return state--;
+                }
+            };
+            resources.add(new MockResource(null,null,null){
+                @Override
+                public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
+                    if (InputStream.class.equals(type)) {
+                        return (AdapterType) in;
+                    }
+                    return super.adaptTo(type);
+                }
+
+                @Override
+                public String getName() {
+                    return "chunk-"+(initialState*100)+"-"+(((initialState+1)*100)-1);
+                }
+            });
+        }
+        ResourceIteratorInputStream resourceIteratorInputStream = new ResourceIteratorInputStream(resources.iterator());
+        int expected = 0;
+        int cycle = 0;
+        for(int i = resourceIteratorInputStream.read(); i >= 0; i = resourceIteratorInputStream.read()) {
+            Assert.assertEquals(expected, i);
+            if ( expected == 0 ) {
+                cycle++;
+                expected=cycle;
+            } else {
+                expected--;
+            }
+        }
+        Assert.assertEquals(10,cycle);
+    }
+}
diff --git a/src/test/java/org/apache/sling/servlets/post/impl/helper/StreamedChunkTest.java b/src/test/java/org/apache/sling/servlets/post/impl/helper/StreamedChunkTest.java
new file mode 100644
index 0000000..414704c
--- /dev/null
+++ b/src/test/java/org/apache/sling/servlets/post/impl/helper/StreamedChunkTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.servlets.post.impl.helper;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Created by ieb on 06/09/2016.
+ */
+public class StreamedChunkTest {
+
+    @Test
+    public void testContentRange() {
+        checkRange("bytes 0-1234/1235", 0, 1235, 1235);
+        checkRange("bytes 10-123/1235", 10,123-10+1,1235);
+        checkRange("bytes 12-123/*", 12, 123 - 12+1, -1);
+        checkInvalidRange("byte 10-123/1234"); // byte is not valid.
+        checkInvalidRange("bytes 1000-123/1234"); // offset before end
+        checkInvalidRange("bytes 1000-12300/1234"); // end before length
+        checkInvalidRange("bytes 1000-12300/big"); // big not valid
+        checkInvalidRange("bytes 1000-12300/"); // no length
+        checkInvalidRange("bytes 1000-12300"); // no length
+    }
+
+    private void checkInvalidRange(String rangeHeader) {
+        try {
+            StreamedChunk.ContentRange cr = new StreamedChunk.ContentRange(rangeHeader);
+            Assert.fail("Should have rejected "+rangeHeader);
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    private void checkRange(String rangeHeader, long offset, long range, long length) {
+        StreamedChunk.ContentRange cr = new StreamedChunk.ContentRange(rangeHeader);
+        Assert.assertEquals(offset,cr.offset);
+        Assert.assertEquals(range,cr.range);
+        Assert.assertEquals(length,cr.length);
+
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/servlets/post/impl/operations/MockPart.java b/src/test/java/org/apache/sling/servlets/post/impl/operations/MockPart.java
index 01cc13a..a48d753 100644
--- a/src/test/java/org/apache/sling/servlets/post/impl/operations/MockPart.java
+++ b/src/test/java/org/apache/sling/servlets/post/impl/operations/MockPart.java
@@ -34,12 +34,13 @@
     private String contentType;
     private InputStream inputStream;
 
-    public MockPart(String name, String contentType, String submittedFileName, long size, InputStream inputStream) {
+    public MockPart(String name, String contentType, String submittedFileName, long size, InputStream inputStream, Map<String, Object> headers) {
         this.name = name;
         this.contentType = contentType;
         this.submittedFileName = submittedFileName;
         this.size = size;
         this.inputStream = inputStream;
+        this.headers = headers;
     }
 
     @Override
diff --git a/src/test/java/org/apache/sling/servlets/post/impl/operations/MockRealResource.java b/src/test/java/org/apache/sling/servlets/post/impl/operations/MockRealResource.java
index ae1d5eb..a6e9eb7 100644
--- a/src/test/java/org/apache/sling/servlets/post/impl/operations/MockRealResource.java
+++ b/src/test/java/org/apache/sling/servlets/post/impl/operations/MockRealResource.java
@@ -24,6 +24,9 @@
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ValueMap;
 
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -34,21 +37,28 @@
     private final ResourceResolver resourceResolver;
     private final String path;
     private final String resourceType;
+    private final String name;
+    private final String parentPath;
     private MockValueMap properties = new MockValueMap();
 
     public MockRealResource(ResourceResolver resourceResolver, String path, String resourceType) {
         this.resourceResolver = resourceResolver;
         this.path = path;
         this.resourceType = resourceType;
+        this.name = path.substring(path.lastIndexOf('/'));
+        this.parentPath = path.substring(0,path.lastIndexOf('/'));
     }
 
     public MockRealResource(ResourceResolver resourceResolver, String path, String resourceType, Map<String, Object> properties) {
         this.resourceResolver = resourceResolver;
         this.path = path;
+        this.name = path.substring(path.lastIndexOf('/')+1);
+        this.parentPath = path.substring(0,path.lastIndexOf('/'));
         this.resourceType = resourceType;
         this.properties.putAll(properties);
     }
 
+
     @Override
     public String getPath() {
         return path;
@@ -56,22 +66,21 @@
 
     @Override
     public String getName() {
-        return null;
+        return name;
     }
 
     @Override
     public Resource getParent() {
-        return null;
+        return resourceResolver.getResource(parentPath);
     }
 
     @Override
     public Iterator<Resource> listChildren() {
-        return null;
+        return resourceResolver.listChildren(this);
     }
 
-    @Override
-    public Iterable<Resource> getChildren() {
-        return null;
+    @Override    public Iterable<Resource> getChildren() {
+        return resourceResolver.getChildren(this);
     }
 
     @Override
@@ -91,7 +100,7 @@
 
     @Override
     public boolean isResourceType(String s) {
-        return false;
+        return s.equals(resourceType);
     }
 
     @Override
@@ -109,6 +118,18 @@
         if (ValueMap.class.isAssignableFrom(aClass)) {
             return (AdapterType) properties;
         }
+        if (InputStream.class.isAssignableFrom(aClass) && properties.containsKey("jcr:data")) {
+            Object o = properties.get("jcr:data");
+            if (o instanceof InputStream) {
+                return (AdapterType) o;
+            } else {
+                try {
+                    return (AdapterType) new ByteArrayInputStream(String.valueOf(properties.get("jcr:data")).getBytes("UTF-8"));
+                } catch (UnsupportedEncodingException e) {
+                    throw new RuntimeException("Cant convert UTF-8 to byte[]");
+                }
+            }
+        }
         return null;
     }
 }
diff --git a/src/test/java/org/apache/sling/servlets/post/impl/operations/StreamingUploadOperationTest.java b/src/test/java/org/apache/sling/servlets/post/impl/operations/StreamingUploadOperationTest.java
index 33d98da..8dd2a62 100644
--- a/src/test/java/org/apache/sling/servlets/post/impl/operations/StreamingUploadOperationTest.java
+++ b/src/test/java/org/apache/sling/servlets/post/impl/operations/StreamingUploadOperationTest.java
@@ -48,6 +48,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -73,7 +74,7 @@
     @Test
     public void test() throws RepositoryException, UnsupportedEncodingException {
         List<Modification> changes = new ArrayList<Modification>();
-        PostResponse ressponse = new AbstractPostResponse() {
+        PostResponse response = new AbstractPostResponse() {
             @Override
             protected void doSend(HttpServletResponse response) throws IOException {
 
@@ -91,11 +92,11 @@
         };
 
         List<Part> partsList = new ArrayList<Part>();
-        partsList.add(new MockPart("formfield1", null, null, 0, new ByteArrayInputStream("testformfield1".getBytes("UTF-8"))));
-        partsList.add(new MockPart("formfield2", null, null, 0, new ByteArrayInputStream("testformfield2".getBytes("UTF-8"))));
-        partsList.add(new MockPart("test1.txt", "text/plain", "test1bad.txt", 4, new ByteArrayInputStream("test".getBytes("UTF-8"))));
-        partsList.add(new MockPart("*", "text/plain2", "test2.txt", 8, new ByteArrayInputStream("test1234".getBytes("UTF-8"))));
-        partsList.add(new MockPart("badformfield2", null, null, 0, new ByteArrayInputStream("testbadformfield2".getBytes("UTF-8"))));
+        partsList.add(new MockPart("formfield1", null, null, 0, new ByteArrayInputStream("testformfield1".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("formfield2", null, null, 0, new ByteArrayInputStream("testformfield2".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("test1.txt", "text/plain", "test1bad.txt", 4, new ByteArrayInputStream("test".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("*", "text/plain2", "test2.txt", 8, new ByteArrayInputStream("test1234".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("badformfield2", null, null, 0, new ByteArrayInputStream("testbadformfield2".getBytes("UTF-8")), Collections.EMPTY_MAP));
         final Iterator<Part> partsIterator = partsList.iterator();
         final Map<String, Resource> repository = new HashMap<String, Resource>();
         final ResourceResolver resourceResolver = new MockResourceResolver() {
@@ -191,7 +192,7 @@
                 return resourceResolver;
             }
         };
-        streamedUplodOperation.doRun(request, ressponse, changes);
+        streamedUplodOperation.doRun(request, response, changes);
 
 
         {
@@ -241,4 +242,439 @@
 
 
     }
+
+    @Test
+    public void testParts() throws RepositoryException, UnsupportedEncodingException {
+        List<Modification> changes = new ArrayList<Modification>();
+        PostResponse response = new AbstractPostResponse() {
+            @Override
+            protected void doSend(HttpServletResponse response) throws IOException {
+
+            }
+
+            @Override
+            public void onChange(String type, String... arguments) {
+
+            }
+
+            @Override
+            public String getPath() {
+                return "/test/upload/location";
+            }
+        };
+
+        List<Part> partsList = new ArrayList<Part>();
+        partsList.add(new MockPart("test1.txt@Length", null, null, 0, new ByteArrayInputStream("8".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("test1.txt@Offset", null, null, 0, new ByteArrayInputStream("0".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart(
+                "test1.txt",
+                "text/plain",
+                "test1bad.txt",
+                4,
+                new ByteArrayInputStream("test".getBytes("UTF-8")),
+                mapOf("Content-Length", "4")));
+        partsList.add(new MockPart("test1.txt@Offset", null, null, 0, new ByteArrayInputStream("4".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart(
+                "test1.txt",
+                "text/plain",
+                "test1bad.txt",
+                4,
+                new ByteArrayInputStream("part".getBytes("UTF-8")),
+                mapOf("Content-Length", "4")));
+        partsList.add(new MockPart("*", "text/plain2", "test2.txt", 8, new ByteArrayInputStream("test1234".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("badformfield2", null, null, 0, new ByteArrayInputStream("testbadformfield2".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        final Iterator<Part> partsIterator = partsList.iterator();
+        final Map<String, Resource> repository = new HashMap<String, Resource>();
+        final ResourceResolver resourceResolver = new MockResourceResolver() {
+            @Override
+            public Resource getResource(String path) {
+
+                Resource resource = repository.get(path);
+
+                if ( resource == null ) {
+                    if ( "/test/upload/location".equals(path)) {
+                        resource =  new MockRealResource(this, path, "sling:Folder");
+                        repository.put(path,resource);
+                        LOG.debug("Created {} ", path);
+
+                    }
+                }
+                LOG.debug("Resource {} is {} {}", path, resource, ResourceUtil.isSyntheticResource(resource));
+                return resource;
+            }
+
+
+
+
+            @Override
+            public Iterable<Resource> getChildren(Resource resource) {
+
+                List<Resource> children = new ArrayList<Resource>();
+                for(Map.Entry<String, Resource> e : repository.entrySet()) {
+                    if (isChild(resource.getPath(), e.getKey())) {
+                        children.add(e.getValue());
+                    }
+                }
+                return children;
+            }
+
+            private boolean isChild(String path, String key) {
+                if ( key.length() > path.length() && key.startsWith(path)) {
+                    return !key.substring(path.length()+1).contains("/");
+                }
+                return false;
+            }
+
+            @Override
+            public Iterator<Resource> listChildren(Resource parent) {
+                return getChildren(parent).iterator();
+            }
+
+            @Override
+            public void delete(Resource resource) throws PersistenceException {
+
+            }
+
+            @Override
+            public Resource create(Resource resource, String s, Map<String, Object> map) throws PersistenceException {
+                Resource childResource = resource.getChild(s);
+                if ( childResource != null) {
+                    throw new IllegalArgumentException("Child "+s+" already exists ");
+                }
+                String resourceType = (String)map.get("sling:resourceType");
+                if ( resourceType == null) {
+                    resourceType = (String)map.get("jcr:primaryType");
+                }
+                if ( resourceType == null) {
+                    LOG.warn("Resource type null for {} {} ", resource,  resource.getPath()+"/"+s);
+                }
+                Resource newResource = new MockRealResource(this, resource.getPath()+"/"+s, resourceType, map);
+                repository.put(newResource.getPath(), newResource);
+                LOG.debug("Created Resource {} ", newResource.getPath());
+                return newResource;
+            }
+
+            @Override
+            public void revert() {
+
+            }
+
+            @Override
+            public void commit() throws PersistenceException {
+                LOG.debug("Committing");
+                for(Map.Entry<String, Resource> e : repository.entrySet()) {
+                    LOG.debug("Committing {} ", e.getKey());
+                    Resource r = e.getValue();
+                    ModifiableValueMap vm = r.adaptTo(ModifiableValueMap.class);
+                    for (Map.Entry<String, Object> me : vm.entrySet()) {
+                        if (me.getValue() instanceof InputStream) {
+                            try {
+                                String value = IOUtils.toString((InputStream) me.getValue());
+                                LOG.debug("Converted {} {}  ", me.getKey(), value);
+                                vm.put(me.getKey(), value);
+
+                            } catch (IOException e1) {
+                                throw new PersistenceException("Failed to commit input stream", e1);
+                            }
+                        }
+                    }
+                    LOG.debug("Converted {} ", vm);
+                }
+                LOG.debug("Comittted {} ", repository);
+
+
+            }
+
+            @Override
+            public boolean hasChanges() {
+                return false;
+            }
+        };
+
+        SlingHttpServletRequest request = new MockSlingHttpServlet3Request(null, null, null, null, null) {
+            @Override
+            public Object getAttribute(String name) {
+                if ( "request-parts-iterator".equals(name)) {
+                    return partsIterator;
+                }
+                return super.getAttribute(name);
+            }
+
+            @Override
+            public ResourceResolver getResourceResolver() {
+                return resourceResolver;
+            }
+        };
+        streamedUplodOperation.doRun(request, response, changes);
+
+
+        {
+            Resource r = repository.get("/test/upload/location/test1.txt");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:file", m.get("jcr:primaryType"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test1.txt/jcr:content");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+            Assert.assertEquals("nt:resource", m.get("jcr:primaryType"));
+            Assert.assertTrue(m.get("jcr:lastModified") instanceof Calendar);
+            Assert.assertEquals("text/plain", m.get("jcr:mimeType"));
+            Assert.assertEquals("testpart", m.get("jcr:data"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test2.txt");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:file", m.get("jcr:primaryType"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test2.txt/jcr:content");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:resource", m.get("jcr:primaryType"));
+            Assert.assertTrue(m.get("jcr:lastModified") instanceof Calendar);
+            Assert.assertEquals("text/plain2", m.get("jcr:mimeType"));
+            Assert.assertEquals("test1234", m.get("jcr:data"));
+        }
+
+
+    }
+
+    @Test
+    public void testPartsContentRange() throws RepositoryException, UnsupportedEncodingException {
+        List<Modification> changes = new ArrayList<Modification>();
+        PostResponse response = new AbstractPostResponse() {
+            @Override
+            protected void doSend(HttpServletResponse response) throws IOException {
+
+            }
+
+            @Override
+            public void onChange(String type, String... arguments) {
+
+            }
+
+            @Override
+            public String getPath() {
+                return "/test/upload/location";
+            }
+        };
+
+        List<Part> partsList = new ArrayList<Part>();
+        partsList.add(new MockPart("formfield1", null, null, 0, new ByteArrayInputStream("testformfield1".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("formfield2", null, null, 0, new ByteArrayInputStream("testformfield2".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart(
+                "test1.txt",
+                "text/plain",
+                "test1bad.txt",
+                4,
+                new ByteArrayInputStream("test".getBytes("UTF-8")),
+                mapOf("Content-Range","bytes 0-3/8", "Content-Length", "4")));
+        partsList.add(new MockPart(
+                "test1.txt",
+                "text/plain",
+                "test1bad.txt",
+                4,
+                new ByteArrayInputStream("part".getBytes("UTF-8")),
+                mapOf("Content-Range","bytes 4-7/8", "Content-Length", "4")));
+        partsList.add(new MockPart("*", "text/plain2", "test2.txt", 8, new ByteArrayInputStream("test1234".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        partsList.add(new MockPart("badformfield2", null, null, 0, new ByteArrayInputStream("testbadformfield2".getBytes("UTF-8")), Collections.EMPTY_MAP));
+        final Iterator<Part> partsIterator = partsList.iterator();
+        final Map<String, Resource> repository = new HashMap<String, Resource>();
+        final ResourceResolver resourceResolver = new MockResourceResolver() {
+            @Override
+            public Resource getResource(String path) {
+
+                Resource resource = repository.get(path);
+
+                if ( resource == null ) {
+                    if ( "/test/upload/location".equals(path)) {
+                        resource =  new MockRealResource(this, path, "sling:Folder");
+                        repository.put(path,resource);
+                        LOG.debug("Created {} ", path);
+
+                    }
+                }
+                LOG.debug("Resource {} is {} {}", path, resource, ResourceUtil.isSyntheticResource(resource));
+                return resource;
+            }
+
+
+
+
+            @Override
+            public Iterable<Resource> getChildren(Resource resource) {
+
+                List<Resource> children = new ArrayList<Resource>();
+                for(Map.Entry<String, Resource> e : repository.entrySet()) {
+                    if (isChild(resource.getPath(), e.getKey())) {
+                        children.add(e.getValue());
+                    }
+                }
+                return children;
+            }
+
+            private boolean isChild(String path, String key) {
+                if ( key.length() > path.length() && key.startsWith(path)) {
+                    return !key.substring(path.length()+1).contains("/");
+                }
+                return false;
+            }
+
+            @Override
+            public Iterator<Resource> listChildren(Resource parent) {
+                return getChildren(parent).iterator();
+            }
+
+            @Override
+            public void delete(Resource resource) throws PersistenceException {
+
+            }
+
+            @Override
+            public Resource create(Resource resource, String s, Map<String, Object> map) throws PersistenceException {
+                Resource childResource = resource.getChild(s);
+                if ( childResource != null) {
+                    throw new IllegalArgumentException("Child "+s+" already exists ");
+                }
+                String resourceType = (String)map.get("sling:resourceType");
+                if ( resourceType == null) {
+                    resourceType = (String)map.get("jcr:primaryType");
+                }
+                if ( resourceType == null) {
+                    LOG.warn("Resource type null for {} {} ", resource,  resource.getPath()+"/"+s);
+                }
+                Resource newResource = new MockRealResource(this, resource.getPath()+"/"+s, resourceType, map);
+                repository.put(newResource.getPath(), newResource);
+                LOG.debug("Created Resource {} ", newResource.getPath());
+                return newResource;
+            }
+
+            @Override
+            public void revert() {
+
+            }
+
+            @Override
+            public void commit() throws PersistenceException {
+                LOG.debug("Committing");
+                for(Map.Entry<String, Resource> e : repository.entrySet()) {
+                    LOG.debug("Committing {} ", e.getKey());
+                    Resource r = e.getValue();
+                    ModifiableValueMap vm = r.adaptTo(ModifiableValueMap.class);
+                    for (Map.Entry<String, Object> me : vm.entrySet()) {
+                        if (me.getValue() instanceof InputStream) {
+                            try {
+                                String value = IOUtils.toString((InputStream) me.getValue());
+                                LOG.debug("Converted {} {}  ", me.getKey(), value);
+                                vm.put(me.getKey(), value);
+
+                            } catch (IOException e1) {
+                                throw new PersistenceException("Failed to commit input stream", e1);
+                            }
+                        }
+                    }
+                    LOG.debug("Converted {} ", vm);
+                }
+                LOG.debug("Comittted {} ", repository);
+
+
+            }
+
+            @Override
+            public boolean hasChanges() {
+                return false;
+            }
+        };
+
+        SlingHttpServletRequest request = new MockSlingHttpServlet3Request(null, null, null, null, null) {
+            @Override
+            public Object getAttribute(String name) {
+                if ( "request-parts-iterator".equals(name)) {
+                    return partsIterator;
+                }
+                return super.getAttribute(name);
+            }
+
+            @Override
+            public ResourceResolver getResourceResolver() {
+                return resourceResolver;
+            }
+        };
+        streamedUplodOperation.doRun(request, response, changes);
+
+
+        {
+            Resource r = repository.get("/test/upload/location/test1.txt");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:file", m.get("jcr:primaryType"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test1.txt/jcr:content");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+            Assert.assertEquals("nt:resource", m.get("jcr:primaryType"));
+            Assert.assertTrue(m.get("jcr:lastModified") instanceof Calendar);
+            Assert.assertEquals("text/plain", m.get("jcr:mimeType"));
+            Assert.assertEquals("testpart", m.get("jcr:data"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test2.txt");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:file", m.get("jcr:primaryType"));
+
+        }
+        {
+            Resource r = repository.get("/test/upload/location/test2.txt/jcr:content");
+            Assert.assertNotNull(r);
+            ValueMap m = r.adaptTo(ValueMap.class);
+            Assert.assertNotNull(m);
+
+
+            Assert.assertEquals("nt:resource", m.get("jcr:primaryType"));
+            Assert.assertTrue(m.get("jcr:lastModified") instanceof Calendar);
+            Assert.assertEquals("text/plain2", m.get("jcr:mimeType"));
+            Assert.assertEquals("test1234", m.get("jcr:data"));
+        }
+
+
+    }
+
+
+
+    private Map<String,Object> mapOf(String ... s) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for (int i = 0; i < s.length; i+=2) {
+            m.put(s[i],s[i+1]);
+        }
+        return m;
+    }
 }