PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1843214 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d95e0c4..9146244 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)
+
 PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini)
 
 PIG-5354: Show fieldname and a line number for casting errors (knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 0b74ca8..eb47b71 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -108,6 +108,7 @@
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezInputHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -119,6 +120,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
@@ -947,30 +949,33 @@
 
             // Currently inputSplitInfo is always InputSplitInfoMem at this point
             if (inputSplitInfo instanceof InputSplitInfoMem) {
-                MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
-                int splitsSerializedSize = splitsProto.getSerializedSize();
-                if(splitsSerializedSize > spillThreshold) {
+                MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+                Pair<Long, Boolean> serializationInfo = TezInputHelper.createSplitsProto(inputSplitInfo, pigContextConf, splitsBuilder,
+                        spillThreshold);
+                MRSplitsProto splitsProto = splitsBuilder.build();
+                if(!serializationInfo.second) {
+                    //write to disk
                     inputPayLoad.setBoolean(
                             org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
                             false);
-                    // Write splits to disk
-                    Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
-                    log.info("Writing input splits to " + inputSplitsDir
+                      // Write splits to disk
+                      Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
+                      log.info("Writing input splits to " + inputSplitsDir
                             + " for vertex " + vertex.getName()
-                            + " as the serialized size in memory is "
-                            + splitsSerializedSize + ". Configured "
+                            + " as the partially serialized size in memory is "
+                            + serializationInfo.first + ". Configured "
                             + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
                             + " is " + spillThreshold);
-                    inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
-                            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs);
-                    additionalLocalResources = new HashMap<String, LocalResource>();
-                    MRToTezHelper.updateLocalResourcesForInputSplits(
+                      inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
+                            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs, splitsProto);
+                      additionalLocalResources = new HashMap<String, LocalResource>();
+                      MRToTezHelper.updateLocalResourcesForInputSplits(
                             fs, inputSplitInfo,
                             additionalLocalResources);
-                    inputSplitInDiskVertices.add(vertex.getName());
+                      inputSplitInDiskVertices.add(vertex.getName());
                 } else {
-                    // Send splits via RPC to AM
-                    userPayLoadBuilder.setSplits(splitsProto);
+                      // Send splits via RPC to AM
+                      userPayLoadBuilder.setSplits(splitsProto);
                 }
                 //Free up memory
                 tezOp.getLoaderInfo().setInputSplitInfo(null);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
index d984fef..2e1288b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
@@ -39,6 +39,7 @@
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POSimpleTezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezInputHelper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -169,7 +170,7 @@
             tezOp.getLoaderInfo().setInpLimits(inpLimits);
             // Not using MRInputAMSplitGenerator because delegation tokens are
             // fetched in FileInputFormat
-            tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0));
+            tezOp.getLoaderInfo().setInputSplitInfo(TezInputHelper.generateInputSplitsToMem(conf));
             // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
             // splits can be moved to if(loads) block below
             int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
index b604d9f..74d25c5 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
@@ -39,7 +39,6 @@
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -55,6 +54,7 @@
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 
 @InterfaceAudience.Private
 public class MRToTezHelper {
@@ -62,7 +62,6 @@
     private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
     private static final String JOB_SPLIT_RESOURCE_NAME = MRJobConfig.JOB_SPLIT;
     private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = MRJobConfig.JOB_SPLIT_METAINFO;
-
     private static Map<String, String> mrAMParamToTezAMParamMap = new HashMap<String, String>();
     private static Map<String, String> mrMapParamToTezVertexParamMap = new HashMap<String, String>();
     private static Map<String, String> mrReduceParamToTezVertexParamMap = new HashMap<String, String>();
@@ -297,14 +296,23 @@
     }
 
     /**
-     * Write input splits (job.split and job.splitmetainfo) to disk
+     * Write input splits (job.split and job.splitmetainfo) to disk. It uses already
+     * serialized splits from given MRSplitsProto
+     * @param infoMem
+     * @param inputSplitsDir
+     * @param jobConf
+     * @param fs
+     * @param splitsProto MRSplitsProto containing already serialized splits
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
      */
     public static InputSplitInfoDisk writeInputSplitInfoToDisk(
             InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
-            FileSystem fs) throws IOException, InterruptedException {
+            FileSystem fs, MRSplitsProto splitsProto) throws IOException, InterruptedException {
 
         InputSplit[] splits = infoMem.getNewFormatSplits();
-        JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);
+        TezJobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits, splitsProto);
 
         return new InputSplitInfoDisk(
                 JobSubmissionFiles.getJobSplitFile(inputSplitsDir),
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java
new file mode 100644
index 0000000..14ea4da
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+/**
+ * Wrapper class to contain information about serialization.
+ */
+public class SerializationInfo {
+    private boolean allSerialized;
+    private long serializedSize = 0L;
+    /**
+     * @return true if all splits are serialized, otherwise false;
+     */
+    public boolean isAllSerialized() {
+        return allSerialized;
+    }
+    public void setAllSerialized(boolean allSerialized) {
+        this.allSerialized = allSerialized;
+    }
+    /**
+     * @return size of serialized splits.
+     */
+    public long getSerializedSize() {
+        return serializedSize;
+    }
+    public void setSerializedSize(long serializedSize) {
+        this.serializedSize = serializedSize;
+    }
+    /**
+     * Increment serialized size
+     * @param increment
+     */
+    public void incrSerializedSize(long increment) {
+        this.serializedSize += increment;
+    }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java
new file mode 100644
index 0000000..87a475c
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+public class TezInputHelper {
+    private static final Log LOG = LogFactory.getLog(TezInputHelper.class);
+
+    /**
+     * This method creates input splits similar to 
+     * {@link org.apache.tez.mapreduce.hadoop.MRInputHelpers#generateInputSplitsToMem}
+     * but only does it for mapreduce API and does not do grouping of splits or create
+     * {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto}
+     * which is an expensive operation.
+     *
+     * @param conf an instance of Configuration. This
+     *        Configuration instance should contain adequate information to
+     *        be able to generate splits - like the InputFormat being used and
+     *        related configuration.
+     * @return an instance of {@link InputSplitInfoMem} which supports a subset
+     *         of the APIs defined on {@link InputSplitInfo}
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf)
+            throws IOException, ClassNotFoundException, InterruptedException {
+
+        InputSplitInfoMem splitInfoMem = null;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generating mapreduce api input splits");
+        }
+        Job job = Job.getInstance(conf);
+        org.apache.hadoop.mapreduce.InputSplit[] splits = generateSplits(job);
+        splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length,
+                job.getCredentials(), job.getConfiguration());
+        return splitInfoMem;
+    }
+
+    private static org.apache.hadoop.mapreduce.InputSplit[] generateSplits(JobContext jobContext)
+            throws ClassNotFoundException, IOException, InterruptedException {
+        Configuration conf = jobContext.getConfiguration();
+
+        // This is the real input format.
+        org.apache.hadoop.mapreduce.InputFormat<?, ?> inputFormat = null;
+        try {
+            inputFormat = ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
+        }
+        catch (ClassNotFoundException e) {
+            throw new TezUncheckedException(e);
+        }
+
+        org.apache.hadoop.mapreduce.InputFormat<?, ?> finalInputFormat = inputFormat;
+        List<org.apache.hadoop.mapreduce.InputSplit> array = finalInputFormat.getSplits(jobContext);
+        org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array
+                .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
+
+        // sort the splits into order based on size, so that the biggest
+        // go first
+        Arrays.sort(splits, new InputSplitComparator());
+        return splits;
+    }
+
+    /**
+     * Comparator for org.apache.hadoop.mapreduce.InputSplit
+     */
+    private static class InputSplitComparator implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
+        @Override
+        public int compare(org.apache.hadoop.mapreduce.InputSplit o1, org.apache.hadoop.mapreduce.InputSplit o2) {
+            try {
+                long len1 = o1.getLength();
+                long len2 = o2.getLength();
+                if (len1 < len2) {
+                    return 1;
+                }
+                else if (len1 == len2) {
+                    return 0;
+                }
+                else {
+                    return -1;
+                }
+            }
+            catch (IOException ie) {
+                throw new RuntimeException("exception in InputSplit compare", ie);
+            }
+            catch (InterruptedException ie) {
+                throw new RuntimeException("exception in InputSplit compare", ie);
+            }
+        }
+    }
+
+    private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+            org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) throws IOException, InterruptedException {
+        List<TaskLocationHint> listLocationHint = new ArrayList<>(newFormatSplits.length);
+        for(org.apache.hadoop.mapreduce.InputSplit input : newFormatSplits) {
+            listLocationHint.add(TaskLocationHint.createTaskLocationHint(
+                    new HashSet<String>(Arrays.asList(input.getLocations())), null));
+        }
+        return listLocationHint;
+    }
+
+    /**
+     * Creates MRSplitsProto from inputSplitInfo and adds into splitsBuilder.
+     * @param inputSplitInfo
+     * @param conf
+     * @param splitsBuilder
+     * @param spillThreshold
+     * @return Pair containing first element, a long, as serialized size and second element, a boolean, as true if all splits are serialized. Second element
+     * will be false, if only some of the splits are serialized because we reached to spillThreshold. 
+     */
+    public static Pair<Long, Boolean> createSplitsProto(InputSplitInfo inputSplitInfo, Configuration conf, MRSplitsProto.Builder splitsBuilder, long spillThreshold
+            ) {
+          try {
+            return createSplitsProto(inputSplitInfo.getNewFormatSplits(), new SerializationFactory(conf), splitsBuilder, spillThreshold);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+    }
+
+    /**
+     * Creates MRSplitsProto from given org.apache.hadoop.mapreduce.InputSplit and adds into splitsBuilder.
+     * @param inputSplits
+     * @param serializationFactory
+     * @param splitsBuilder
+     * @param spillThreshold
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private static Pair<Long, Boolean> createSplitsProto(InputSplit[] inputSplits,
+            SerializationFactory serializationFactory, MRSplitsProto.Builder splitsBuilder,
+            long spillThreshold) throws IOException, InterruptedException {
+        MRSplitProto split = null;
+        long serializedSize = 0;
+        boolean allSerialized = true;
+        for (int i=0;i<inputSplits.length;i++) {
+          split = MRInputHelpers.createSplitProto(inputSplits[i], serializationFactory);
+          serializedSize += split.getSerializedSize();
+          splitsBuilder.addSplits(split);
+          // check for threshold after adding split, it may cause splitsSerializedSize to become more than spillThreshold,
+          // but we don't want to waste already serialized split
+          if(serializedSize > spillThreshold && i != (inputSplits.length - 1)) {
+              allSerialized = false;
+              break;
+          }
+        }
+        return new Pair<Long,Boolean>(serializedSize, allSerialized);
+    }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java
new file mode 100644
index 0000000..29382ee
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+public class TezJobSplitWriter {
+    private static final Log LOG = LogFactory.getLog(TezJobSplitWriter.class);
+    private static final int splitVersion = 1;
+    private static final byte[] SPLIT_FILE_HEADER;
+    static {
+        try {
+            SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
+        }
+        catch (UnsupportedEncodingException u) {
+            throw new RuntimeException(u);
+        }
+    }
+    static final byte[] META_SPLIT_FILE_HEADER;
+    static {
+        try {
+            META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
+        }
+        catch (UnsupportedEncodingException u) {
+            throw new RuntimeException(u);
+        }
+    }
+
+    /**
+     * Create split files and write splits as well as as splits metadata
+     * @param jobSubmitDir
+     * @param conf
+     * @param fs
+     * @param splits
+     * @param splitsProto
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static  <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs,
+            T[] splits, MRSplitsProto splitsProto) throws IOException, InterruptedException {
+        FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+        SplitMetaInfo[] info = writeSplits(conf, splits, out, splitsProto);
+        out.close();
+        writeJobSplitMetaInfo(fs, JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
+                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);
+    }
+
+    private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, Configuration job) throws IOException {
+        FSDataOutputStream out = FileSystem.create(fs, splitFile,
+                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+        int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
+        fs.setReplication(splitFile, (short) replication);
+        writeSplitHeader(out);
+        return out;
+    }
+
+    private static void writeSplitHeader(FSDataOutputStream out) throws IOException {
+        out.write(SPLIT_FILE_HEADER);
+        out.writeInt(splitVersion);
+    }
+
+    /**
+     * If there are already serialized splits in <code>splitsProto</code>, then write those splits, else
+     * serialize and writes the splits.
+     * @param conf
+     * @param array
+     * @param out
+     * @param splitsProto
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @SuppressWarnings("unchecked")
+    private static <T extends InputSplit> SplitMetaInfo[] writeSplits(Configuration conf, T[] array,
+            FSDataOutputStream out, MRSplitsProto splitsProto) throws IOException, InterruptedException {
+        SplitMetaInfo[] info = null;
+        if (array.length != 0) {
+            info = new SplitMetaInfo[array.length];
+            SerializationFactory factory = new SerializationFactory(conf);
+            int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
+            long offset = out.getPos();
+            int i = 0;
+            for(MRSplitProto splitProto : splitsProto.getSplitsList()) {
+                long prevCount = out.getPos();
+                Text.writeString(out, splitProto.getSplitClassName());
+                splitProto.getSplitBytes().writeTo(out);
+                info[i++] = createSplitMetaInfo(array[i], offset, maxBlockLocations);
+                offset += out.getPos() - prevCount;
+            }
+            while(i < array.length) {
+                long prevCount = out.getPos();
+                Text.writeString(out, array[i].getClass().getName());
+                Serializer<T> serializer = factory.getSerializer((Class<T>) array[i].getClass());
+                serializer.open(out);
+                serializer.serialize(array[i]);
+                info[i++] = createSplitMetaInfo(array[i], offset, maxBlockLocations);
+                offset += out.getPos() - prevCount;
+            }
+        }
+        LOG.info("Size of serialized job.split file is " + out.getPos());
+        return info;
+    }
+
+    /**
+     * Serializes split and write to given FSDataOutputStream.
+     * If splitProto contains already serialized splits, write those to given FSDataOutputStream.
+     * @param split
+     * @param offset
+     * @param maxBlockLocations
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private static <T extends InputSplit> SplitMetaInfo createSplitMetaInfo(T split,
+            long offset, int maxBlockLocations) throws IOException, InterruptedException {
+        String[] locations = split.getLocations();
+        if (locations.length > maxBlockLocations) {
+            LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length
+                    + " maxsize: " + maxBlockLocations);
+            locations = Arrays.copyOf(locations, maxBlockLocations);
+        }
+        return new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+    }
+
+    private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion,
+            JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
+        // write the splits meta-info to a file for the job tracker
+        FSDataOutputStream out = FileSystem.create(fs, filename, p);
+        out.write(META_SPLIT_FILE_HEADER);
+        WritableUtils.writeVInt(out, splitMetaInfoVersion);
+        WritableUtils.writeVInt(out, allSplitMetaInfo.length);
+        for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
+            splitMetaInfo.write(out);
+        }
+        out.close();
+    }
+
+}