Support data preprocessing in Spark framework
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 85f5c6c..a4b0086 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -18,33 +18,19 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,23 +46,6 @@
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
 
-  private String _partitionColumn;
-  private int _numPartitions;
-  private String _partitionFunction;
-  private String _partitionColumnDefaultNullValue;
-
-  private String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-  private String _sortingColumnDefaultNullValue;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private org.apache.pinot.spi.data.Schema _pinotTableSchema;
-
-  private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
-
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -99,8 +68,8 @@
     // Cleans up preprocessed output dir if exists
     cleanUpPreprocessedOutputs(_preprocessedOutputDir);
 
-    DataPreprocessingHelper dataPreprocessingHelper =
-        DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+    HadoopDataPreprocessingHelper dataPreprocessingHelper =
+        HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
             _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
@@ -130,130 +99,6 @@
     LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
   }
 
-  private void fetchPreProcessingOperations() {
-    _preprocessingOperations = new HashSet<>();
-    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
-    if (customConfig != null) {
-      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
-      if (customConfigMap != null && !customConfigMap.isEmpty()) {
-        String preprocessingOperationsString =
-            customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
-        DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
-      }
-    }
-  }
-
-  private void fetchPartitioningConfig() {
-    // Fetch partition info from table config.
-    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
-      LOGGER.info("Partitioning is disabled.");
-      return;
-    }
-    SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (segmentPartitionConfig != null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
-      Preconditions
-          .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
-      if (columnPartitionMap.size() == 1) {
-        _partitionColumn = columnPartitionMap.keySet().iterator().next();
-        _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
-        _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
-        _partitionColumnDefaultNullValue =
-            _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
-      }
-    } else {
-      LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
-    }
-  }
-
-  private void fetchSortingConfig() {
-    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
-      LOGGER.info("Sorting is disabled.");
-      return;
-    }
-    // Fetch sorting info from table config first.
-    List<String> sortingColumns = new ArrayList<>();
-    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
-    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
-      for (FieldConfig fieldConfig : fieldConfigs) {
-        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
-          sortingColumns.add(fieldConfig.getName());
-        }
-      }
-    }
-    if (!sortingColumns.isEmpty()) {
-      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table.");
-      _sortingColumn = sortingColumns.get(0);
-      return;
-    }
-
-    // There is no sorted column specified in field configs, try to find sorted column from indexing config.
-    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
-    List<String> sortedColumns = indexingConfig.getSortedColumn();
-    if (sortedColumns != null) {
-      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
-      if (sortedColumns.size() == 1) {
-        _sortingColumn = sortedColumns.get(0);
-        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
-        Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
-        Preconditions
-            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
-        _sortingColumnType = fieldSpec.getDataType();
-        Preconditions
-            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
-                _sortingColumn);
-        LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
-      }
-    }
-    if (_sortingColumn != null) {
-      _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
-    }
-  }
-
-  private void fetchResizingConfig() {
-    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
-      LOGGER.info("Resizing is disabled.");
-      return;
-    }
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      _numOutputFiles = 0;
-      return;
-    }
-    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
-      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
-      Preconditions.checkState(_numOutputFiles > 0,
-          String.format("The value of %s should be positive! Current value: %s",
-              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
-    } else {
-      _numOutputFiles = 0;
-    }
-
-    if (customConfigsMap != null) {
-      int maxNumRecords;
-      if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
-        LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
-            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
-            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
-        maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
-      } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
-        maxNumRecords =
-            Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
-      } else {
-        return;
-      }
-      // TODO: add a in-built maximum value for this config to avoid having too many small files.
-      // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from
-      // generating too many small files.
-      Preconditions.checkArgument(maxNumRecords > 0,
-          "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
-              + " should be positive. Current value: " + maxNumRecords);
-      LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
-      _maxNumRecordsPerFile = maxNumRecords;
-    }
-  }
-
   @Override
   protected Schema getSchema()
       throws IOException {
@@ -275,15 +120,6 @@
   protected void addAdditionalJobProperties(Job job) {
   }
 
-  private void setTableConfigAndSchema()
-      throws IOException {
-    _tableConfig = getTableConfig();
-    _pinotTableSchema = getSchema();
-
-    Preconditions.checkState(_tableConfig != null, "Table config cannot be null.");
-    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
-  }
-
   /**
    * Cleans up outputs in preprocessed output directory.
    */
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 4291790..ac8e2fd 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -35,9 +35,9 @@
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
deleted file mode 100644
index dc51e71..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.job.preprocess;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
-import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class DataPreprocessingHelper {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
-
-  String _partitionColumn;
-  int _numPartitions;
-  String _partitionFunction;
-  String _partitionColumnDefaultNullValue;
-
-  String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-  String _sortingColumnDefaultNullValue;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private Schema _pinotTableSchema;
-
-  List<Path> _inputDataPaths;
-  Path _sampleRawDataPath;
-  Path _outputPath;
-
-  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
-    _inputDataPaths = inputDataPaths;
-    _sampleRawDataPath = inputDataPaths.get(0);
-    _outputPath = outputPath;
-  }
-
-  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
-      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
-      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
-      int maxNumRecordsPerFile) {
-    _tableConfig = tableConfig;
-    _pinotTableSchema = tableSchema;
-    _partitionColumn = partitionColumn;
-    _numPartitions = numPartitions;
-    _partitionFunction = partitionFunction;
-    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
-
-    _sortingColumn = sortingColumn;
-    _sortingColumnType = sortingColumnType;
-    _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
-
-    _numOutputFiles = numOutputFiles;
-    _maxNumRecordsPerFile = maxNumRecordsPerFile;
-  }
-
-  public Job setUpJob()
-      throws IOException {
-    LOGGER.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
-    Configuration jobConf = job.getConfiguration();
-    // Input and output paths.
-    int numInputPaths = _inputDataPaths.size();
-    jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
-    setValidationConfigs(job, _sampleRawDataPath);
-    for (Path inputFile : _inputDataPaths) {
-      FileInputFormat.addInputPath(job, inputFile);
-    }
-    setHadoopJobConfigs(job);
-
-    // Sorting column
-    if (_sortingColumn != null) {
-      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
-
-      switch (_sortingColumnType) {
-        case INT:
-          job.setMapOutputKeyClass(IntWritable.class);
-          break;
-        case LONG:
-          job.setMapOutputKeyClass(LongWritable.class);
-          break;
-        case FLOAT:
-          job.setMapOutputKeyClass(FloatWritable.class);
-          break;
-        case DOUBLE:
-          job.setMapOutputKeyClass(DoubleWritable.class);
-          break;
-        case STRING:
-          job.setMapOutputKeyClass(Text.class);
-          job.setSortComparatorClass(TextComparator.class);
-          break;
-        default:
-          throw new IllegalStateException();
-      }
-    } else {
-      job.setMapOutputKeyClass(NullWritable.class);
-    }
-
-    // Partition column
-    int numReduceTasks = 0;
-    if (_partitionColumn != null) {
-      numReduceTasks = _numPartitions;
-      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
-      job.setPartitionerClass(GenericPartitioner.class);
-      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
-      if (_partitionFunction != null) {
-        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
-      }
-      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
-      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
-    } else {
-      if (_numOutputFiles > 0) {
-        numReduceTasks = _numOutputFiles;
-      } else {
-        // default number of input paths
-        numReduceTasks = _inputDataPaths.size();
-      }
-    }
-    job.setPartitionerClass(getPartitioner());
-    // Maximum number of records per output file
-    jobConf
-        .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
-    // Number of reducers
-    LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
-    job.setNumReduceTasks(numReduceTasks);
-
-    setUpMapperReducerConfigs(job);
-
-    return job;
-  }
-
-  abstract Class<? extends Partitioner> getPartitioner();
-
-  abstract void setUpMapperReducerConfigs(Job job)
-      throws IOException;
-
-  abstract String getSampleTimeColumnValue(String timeColumnName)
-      throws IOException;
-
-  private void setValidationConfigs(Job job, Path path)
-      throws IOException {
-    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-
-    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
-    // If the use case is an append use case, check that one time unit is contained in one file. If there is more
-    // than one,
-    // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
-    // and value
-    if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
-      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
-      String timeColumnName = validationConfig.getTimeColumnName();
-      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
-      if (timeColumnName != null) {
-        DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
-        if (dateTimeFieldSpec != null) {
-          DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
-          String sdfPattern = formatSpec.getSDFPattern();
-          if (sdfPattern != null) {
-            job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
-          }
-        }
-      }
-      job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
-          IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
-
-      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
-      if (sampleTimeColumnValue != null) {
-        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
-      }
-    }
-  }
-
-  private void setHadoopJobConfigs(Job job) {
-    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-    job.setJobName(getClass().getName());
-    FileOutputFormat.setOutputPath(job, _outputPath);
-    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
-    // Turn this on to always firstly use class paths that user specifies.
-    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
-    // Turn this off since we don't need an empty file in the output directory
-    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
-    String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    if (hadoopTokenFileLocation != null) {
-      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
-    }
-  }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..adaef88
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopAvroDataPreprocessingHelper extends HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class);
+
+  public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Schema avroSchema = (Schema) getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+    validateConfigsAgainstSchema(avroSchema);
+
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+    job.setReducerClass(AvroDataPreprocessingReducer.class);
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroJob.setInputKeySchema(job, avroSchema);
+    AvroJob.setMapOutputValueSchema(job, avroSchema);
+    AvroJob.setOutputKeySchema(job, avroSchema);
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
new file mode 100644
index 0000000..bf89c62
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.utils.preprocess.TextComparator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class HadoopDataPreprocessingHelper implements HadoopJobPreparer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class);
+
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+  public HadoopDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
+  }
+
+  public Job setUpJob()
+      throws IOException {
+    LOGGER.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+    Configuration jobConf = job.getConfiguration();
+    // Input and output paths.
+    int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size();
+    jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+    _dataPreprocessingHelper.setValidationConfigs(job, _dataPreprocessingHelper._sampleRawDataPath);
+    for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) {
+      FileInputFormat.addInputPath(job, inputFile);
+    }
+    setHadoopJobConfigs(job);
+
+    // Sorting column
+    if (_dataPreprocessingHelper._sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", _dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _dataPreprocessingHelper._sortingColumnType.name());
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE,
+          _dataPreprocessingHelper._sortingColumnDefaultNullValue);
+
+      switch (_dataPreprocessingHelper._sortingColumnType) {
+        case INT:
+          job.setMapOutputKeyClass(IntWritable.class);
+          break;
+        case LONG:
+          job.setMapOutputKeyClass(LongWritable.class);
+          break;
+        case FLOAT:
+          job.setMapOutputKeyClass(FloatWritable.class);
+          break;
+        case DOUBLE:
+          job.setMapOutputKeyClass(DoubleWritable.class);
+          break;
+        case STRING:
+          job.setMapOutputKeyClass(Text.class);
+          job.setSortComparatorClass(TextComparator.class);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      job.setMapOutputKeyClass(NullWritable.class);
+    }
+
+    // Partition column
+    int numReduceTasks = 0;
+    if (_dataPreprocessingHelper._partitionColumn != null) {
+      numReduceTasks = _dataPreprocessingHelper._numPartitions;
+      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _dataPreprocessingHelper._partitionColumn);
+      if (_dataPreprocessingHelper._partitionFunction != null) {
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _dataPreprocessingHelper._partitionFunction);
+      }
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE,
+          _dataPreprocessingHelper._partitionColumnDefaultNullValue);
+      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+    } else {
+      if (_dataPreprocessingHelper._numOutputFiles > 0) {
+        numReduceTasks = _dataPreprocessingHelper._numOutputFiles;
+      } else {
+        // default number of input paths
+        numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size();
+      }
+    }
+    job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner());
+    // Maximum number of records per output file
+    jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE,
+        Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile));
+    // Number of reducers
+    LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    setUpMapperReducerConfigs(job);
+
+    return job;
+  }
+
+  private void setHadoopJobConfigs(Job job) {
+    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+    job.setJobName(getClass().getName());
+    FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath);
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+    // Turn this off since we don't need an empty file in the output directory
+    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+    String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    }
+  }
+
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return _dataPreprocessingHelper.getSchema(inputPathDir);
+  }
+
+  public void validateConfigsAgainstSchema(Object schema) {
+    _dataPreprocessingHelper.validateConfigsAgainstSchema(schema);
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction,
+            partitionColumnDefaultNullValue,
+            sortingColumn, sortingColumnType, sortingColumnDefaultNullValue, numOutputFiles, maxNumRecordsPerFile);
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
similarity index 65%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
index 43ee97b..c8dec82 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
@@ -22,38 +22,39 @@
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DataPreprocessingHelperFactory {
-  private DataPreprocessingHelperFactory() {
+public class HadoopDataPreprocessingHelperFactory {
+  private HadoopDataPreprocessingHelperFactory() {
   }
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class);
 
-  public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+  public static HadoopDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
       throws IOException {
     final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
     final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
 
     int numAvroFiles = avroFiles.size();
     int numOrcFiles = orcFiles.size();
-    Preconditions
-        .checkState(numAvroFiles == 0 || numOrcFiles == 0,
-            "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
-            inputPaths);
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+        inputPaths);
     Preconditions
         .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
             inputPaths);
 
     if (numAvroFiles > 0) {
       LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
-      return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+      return new HadoopAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath));
     } else {
       LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
-      return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+      return new HadoopOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath));
     }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
new file mode 100644
index 0000000..c4834cf
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.Job;
+
+
+public interface HadoopJobPreparer {
+
+  void setUpMapperReducerConfigs(Job job) throws IOException;
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..0d9ee78
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopOrcDataPreprocessingHelper extends HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class);
+
+  public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    String orcSchemaString = orcSchema.toString();
+    LOGGER.info("Orc schema is: {}", orcSchemaString);
+    validateConfigsAgainstSchema(orcSchema);
+
+    job.setInputFormatClass(OrcInputFormat.class);
+    job.setMapperClass(OrcDataPreprocessingMapper.class);
+    job.setMapOutputValueClass(OrcValue.class);
+    Configuration jobConf = job.getConfiguration();
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+    job.setReducerClass(OrcDataPreprocessingReducer.class);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcStruct.class);
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
similarity index 78%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
index 2f97f72..5f74646 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
@@ -19,13 +19,13 @@
 package org.apache.pinot.hadoop.job.preprocess;
 
 import com.google.common.base.Preconditions;
+import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -34,6 +34,8 @@
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -41,18 +43,34 @@
 import static org.testng.Assert.assertNull;
 
 
-public class DataPreprocessingHelperTest {
+public class HadoopDataPreprocessingHelperTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "HadoopDataPreprocessingHelperTest");
+
+  @BeforeClass
+  public static void setUp()
+      throws IOException {
+    String pathString = Preconditions
+        .checkNotNull(
+            HadoopDataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
+        .getPath();
+
+    // Copy the input path to a temp directory.
+    FileUtils.deleteQuietly(TEMP_DIR);
+    FileUtils.forceMkdir(TEMP_DIR);
+    FileUtils.copyFileToDirectory(new File(pathString), TEMP_DIR);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
+  }
 
   @Test
   public void testDataPreprocessingHelper()
       throws IOException {
-    List<Path> inputPaths = new ArrayList<>();
-    String pathString = Preconditions
-        .checkNotNull(DataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
-        .getPath();
-    inputPaths.add(new Path(pathString));
     Path outputPath = new Path("mockOutputPath");
-    DataPreprocessingHelper dataPreprocessingHelper = new AvroDataPreprocessingHelper(inputPaths, outputPath);
+    HadoopDataPreprocessingHelper dataPreprocessingHelper =
+        HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(new Path(TEMP_DIR.toString()), outputPath);
 
     BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, "APPEND", "DAILY");
     IngestionConfig ingestionConfig = new IngestionConfig(batchIngestionConfig, null, null, null, null);
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 965a98a..3d7b489 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -117,5 +117,14 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 70c71e3..df6d462 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -21,11 +21,25 @@
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +53,7 @@
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
-  private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
   protected final Path _schemaFile;
   protected final Path _inputSegmentDir;
   protected final Path _preprocessedOutputDir;
@@ -47,6 +61,23 @@
   protected final Path _pathToDependencyJar;
   protected boolean _enablePreprocessing;
 
+  protected String _partitionColumn;
+  protected int _numPartitions;
+  protected String _partitionFunction;
+  protected String _partitionColumnDefaultNullValue;
+
+  protected String _sortingColumn;
+  protected FieldSpec.DataType _sortingColumnType;
+  protected String _sortingColumnDefaultNullValue;
+
+  protected int _numOutputFiles;
+  protected int _maxNumRecordsPerFile;
+
+  protected TableConfig _tableConfig;
+  protected org.apache.pinot.spi.data.Schema _pinotTableSchema;
+
+  protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
   public SegmentPreprocessingJob(final Properties properties) {
     super(properties);
 
@@ -60,13 +91,13 @@
     _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
     _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
 
-    _logger.info("*********************************************************************");
-    _logger.info("enable.preprocessing: {}", _enablePreprocessing);
-    _logger.info("path.to.input: {}", _inputSegmentDir);
-    _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
-    _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
-    _logger.info("push.locations: {}", _pushLocations);
-    _logger.info("*********************************************************************");
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("enable.preprocessing: {}", _enablePreprocessing);
+    LOGGER.info("path.to.input: {}", _inputSegmentDir);
+    LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+    LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar);
+    LOGGER.info("push.locations: {}", _pushLocations);
+    LOGGER.info("*********************************************************************");
   }
 
   protected abstract void run()
@@ -91,4 +122,137 @@
     // TODO: support orc format in the future.
     return fileName.endsWith(".avro");
   }
+
+  protected void setTableConfigAndSchema()
+      throws IOException {
+    _tableConfig = getTableConfig();
+    _pinotTableSchema = getSchema();
+
+    Preconditions.checkState(_tableConfig != null, "Table config cannot be null.");
+    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
+  }
+
+  protected void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString =
+            customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+        DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
+      }
+    }
+  }
+
+  protected void fetchPartitioningConfig() {
+    // Fetch partition info from table config.
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
+    SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+      Preconditions
+          .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
+      if (columnPartitionMap.size() == 1) {
+        _partitionColumn = columnPartitionMap.keySet().iterator().next();
+        _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
+        _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+        _partitionColumnDefaultNullValue =
+            _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
+      }
+    } else {
+      LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+    }
+  }
+
+  protected void fetchSortingConfig() {
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
+    // Fetch sorting info from table config first.
+    List<String> sortingColumns = new ArrayList<>();
+    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+      for (FieldConfig fieldConfig : fieldConfigs) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+          sortingColumns.add(fieldConfig.getName());
+        }
+      }
+    }
+    if (!sortingColumns.isEmpty()) {
+      Preconditions.checkArgument(sortingColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+      _sortingColumn = sortingColumns.get(0);
+      return;
+    }
+
+    // There is no sorted column specified in field configs, try to find sorted column from indexing config.
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    List<String> sortedColumns = indexingConfig.getSortedColumn();
+    if (sortedColumns != null) {
+      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+      if (sortedColumns.size() == 1) {
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions
+            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
+                _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
+      }
+    }
+    if (_sortingColumn != null) {
+      _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+    }
+  }
+
+  protected void fetchResizingConfig() {
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
+    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+    if (tableCustomConfig == null) {
+      _numOutputFiles = 0;
+      return;
+    }
+    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
+      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
+      Preconditions.checkState(_numOutputFiles > 0, String
+          .format("The value of %s should be positive! Current value: %s",
+              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
+    } else {
+      _numOutputFiles = 0;
+    }
+
+    if (customConfigsMap != null) {
+      int maxNumRecords;
+      if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
+        LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
+            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+        maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+      } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
+        maxNumRecords =
+            Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+      } else {
+        return;
+      }
+      // TODO: add a in-built maximum value for this config to avoid having too many small files.
+      // E.g. if the config is set to 1 which is smaller than this in-built value,
+      // the job should be abort from generating too many small files.
+      Preconditions.checkArgument(maxNumRecords > 0,
+          "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
+              + " should be positive. Current value: " + maxNumRecords);
+      LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
+    }
+  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
similarity index 73%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
index 9e5f5f2..3be4768 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -26,23 +26,13 @@
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,30 +51,19 @@
   }
 
   @Override
-  public void setUpMapperReducerConfigs(Job job)
+  public Object getSchema(Path inputPathDir)
       throws IOException {
-    Schema avroSchema = getAvroSchema(_sampleRawDataPath);
-    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
-    validateConfigsAgainstSchema(avroSchema);
-
-    job.setInputFormatClass(AvroKeyInputFormat.class);
-    job.setMapperClass(AvroDataPreprocessingMapper.class);
-
-    job.setReducerClass(AvroDataPreprocessingReducer.class);
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
+    return getAvroSchema(inputPathDir);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public void validateConfigsAgainstSchema(Object schema) {
+    Schema avroSchema = (Schema) schema;
+    validateConfigsAgainstSchema(avroSchema);
+  }
+
+  @Override
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     String sampleTimeColumnValue;
     try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) {
@@ -99,7 +78,7 @@
    * @return Input schema
    * @throws IOException exception when accessing to IO
    */
-  private Schema getAvroSchema(Path inputPathDir)
+  protected Schema getAvroSchema(Path inputPathDir)
       throws IOException {
     Schema avroSchema = null;
     for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..6ef165f
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper implements SampleTimeColumnExtractable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+  public String _partitionColumn;
+  public int _numPartitions;
+  public String _partitionFunction;
+  public String _partitionColumnDefaultNullValue;
+
+  public String _sortingColumn;
+  public FieldSpec.DataType _sortingColumnType;
+  public String _sortingColumnDefaultNullValue;
+
+  public int _numOutputFiles;
+  public int _maxNumRecordsPerFile;
+
+  public TableConfig _tableConfig;
+  public Schema _pinotTableSchema;
+
+  public List<Path> _inputDataPaths;
+  public Path _sampleRawDataPath;
+  public Path _outputPath;
+
+  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    _inputDataPaths = inputDataPaths;
+    _sampleRawDataPath = inputDataPaths.get(0);
+    _outputPath = outputPath;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _tableConfig = tableConfig;
+    _pinotTableSchema = tableSchema;
+    _partitionColumn = partitionColumn;
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
+
+    _sortingColumn = sortingColumn;
+    _sortingColumnType = sortingColumnType;
+    _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
+
+    _numOutputFiles = numOutputFiles;
+    _maxNumRecordsPerFile = maxNumRecordsPerFile;
+  }
+
+  public abstract Class<? extends Partitioner> getPartitioner();
+
+  abstract public Object getSchema(Path inputPathDir)
+      throws IOException;
+
+  abstract public void validateConfigsAgainstSchema(Object schema);
+
+  public void setValidationConfigs(Job job, Path path)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+    // If the use case is an append use case, check that one time unit is contained in one file.
+    // If there is more than one, the job should be disabled, as we should not resize for these use cases.
+    // Therefore, setting the time column name and value.
+    if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
+      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+      String timeColumnName = validationConfig.getTimeColumnName();
+      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
+      if (timeColumnName != null) {
+        DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+        if (dateTimeFieldSpec != null) {
+          DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+          job.getConfiguration()
+              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
+          String sdfPattern = formatSpec.getSDFPattern();
+          if (sdfPattern != null) {
+            job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+          }
+        }
+      }
+      job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+          IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+      if (sampleTimeColumnValue != null) {
+        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
index 43ee97b..c89e30d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
similarity index 82%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
index 24c0473..4371cda 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -30,23 +29,13 @@
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcValue;
-import org.apache.orc.mapreduce.OrcInputFormat;
-import org.apache.orc.mapreduce.OrcOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,33 +51,24 @@
   }
 
   @Override
-  Class<? extends Partitioner> getPartitioner() {
+  public Class<? extends Partitioner> getPartitioner() {
     return OrcDataPreprocessingPartitioner.class;
   }
 
   @Override
-  void setUpMapperReducerConfigs(Job job) {
-    TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
-    String orcSchemaString = orcSchema.toString();
-    LOGGER.info("Orc schema is: {}", orcSchemaString);
-    validateConfigsAgainstSchema(orcSchema);
-
-    job.setInputFormatClass(OrcInputFormat.class);
-    job.setMapperClass(OrcDataPreprocessingMapper.class);
-    job.setMapOutputValueClass(OrcValue.class);
-    Configuration jobConf = job.getConfiguration();
-    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
-
-    job.setReducerClass(OrcDataPreprocessingReducer.class);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(OrcStruct.class);
-    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return getOrcSchema(inputPathDir);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public void validateConfigsAgainstSchema(Object schema) {
+    TypeDescription orcSchema = (TypeDescription) schema;
+    validateConfigsAgainstSchema(orcSchema);
+  }
+
+  @Override
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     try (Reader reader = OrcFile
         .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
new file mode 100644
index 0000000..feaceee
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+
+
+public interface SampleTimeColumnExtractable {
+
+  String getSampleTimeColumnValue(String timeColumnName)
+      throws IOException;
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
index d9f17ac..0e5e33b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,8 +27,8 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
similarity index 93%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
index 8ad9d84..9180286 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,9 +27,9 @@
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
index 5ae8966..3ddd255 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -30,8 +30,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
index 5f2ae4d..a19d24c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +27,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
index 118d310..0a066c2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
index 853502c..cd36bdf 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
@@ -28,8 +28,8 @@
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
index 0ba57db..1826bdc 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
index 62ed4a3..f9a046c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.avro.generic.GenericRecord;
@@ -27,7 +27,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
index a3387a2..10d3f10 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -27,7 +27,7 @@
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
similarity index 98%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
index 1998399..0e60d2d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils;
 
 import java.util.Set;
 import org.apache.hadoop.io.DoubleWritable;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
similarity index 98%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
index b26fc38..b8fa59a 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job;
+package org.apache.pinot.ingestion.utils;
 
 /**
  * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
index 58e1c1d..a6e4ca7 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
similarity index 95%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
index 0596259..e4cdf5e 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
index dcfc3b5..09eb218 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.io.BooleanWritable;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
index 4def5bd..9e52957 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
index 6427501..20a5734 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
@@ -106,6 +106,10 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-common</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.esotericsoftware</groupId>
+          <artifactId>kryo-shaded</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -282,6 +286,10 @@
       </exclusions>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+    </dependency>
 
     <!--Test-->
     <dependency>
@@ -379,6 +387,14 @@
           <groupId>commons-pool</groupId>
           <artifactId>commons-pool</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro-mapred</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
       </exclusions>
       <scope>test</scope>
     </dependency>
@@ -463,8 +479,16 @@
           <groupId>org.apache.orc</groupId>
           <artifactId>orc-core</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.orc</groupId>
+          <artifactId>orc-mapreduce</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop</artifactId>
+        </exclusion>
       </exclusions>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -480,7 +504,7 @@
           <artifactId>objenesis</artifactId>
         </exclusion>
       </exclusions>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.derby</groupId>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
new file mode 100644
index 0000000..4743d1f
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory;
+import org.apache.pinot.spark.utils.HadoopUtils;
+import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Spark job which provides partitioning, sorting, and resizing against the input files,
+ * which is raw data in either Avro or Orc format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job properties:
+ * * enable.preprocessing: false by default. Enables preprocessing job.
+ */
+public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class);
+
+  public SparkSegmentPreprocessingJob(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    if (!_enablePreprocessing) {
+      LOGGER.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      LOGGER.info("Starting {}", getClass().getSimpleName());
+    }
+
+    setTableConfigAndSchema();
+    fetchPreProcessingOperations();
+    fetchPartitioningConfig();
+    fetchSortingConfig();
+    fetchResizingConfig();
+
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+    SparkDataPreprocessingHelper dataPreprocessingHelper =
+        SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+    dataPreprocessingHelper
+        .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
+            _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
+            _numOutputFiles, _maxNumRecordsPerFile);
+
+    // Set up and execute spark job.
+    JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    addDepsJarToDistributedCache(javaSparkContext);
+
+    SparkSession sparkSession =
+        SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate();
+
+    dataPreprocessingHelper.setUpAndExecuteJob(sparkSession);
+  }
+
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
+      throws IOException {
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+    }
+  }
+
+  protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+      throws IOException {
+    if (_pathToDependencyJar != null) {
+      PinotSparkJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, sparkContext, _pathToDependencyJar);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..23f17e8
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+
+
+public class SparkAvroDataPreprocessingHelper extends SparkDataPreprocessingHelper {
+  public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public String getDataFormat() {
+    return "avro";
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
new file mode 100644
index 0000000..7a7a564
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import com.google.common.collect.Ordering;
+import java.io.Serializable;
+
+
+public class SparkDataPreprocessingComparator extends Ordering<Object> implements Serializable {
+  @Override
+  public int compare(Object left, Object right) {
+    Object value1 = ((SparkDataPreprocessingJobKey) left).getSortedColumn();
+    Object value2 = ((SparkDataPreprocessingJobKey) right).getSortedColumn();
+    if (value1 == null) {
+      return 0;
+    }
+    if (value1 instanceof Integer) {
+      return Integer.compare((int) value1, (int) value2);
+    } else if (value1 instanceof Long) {
+      return Long.compare((long) value1, (long) value2);
+    } else if (value1 instanceof Float) {
+      return Float.compare((float) value1, (float) value2);
+    } else if (value1 instanceof Double) {
+      return Double.compare((double) value1, (double) value2);
+    } else if (value1 instanceof Short) {
+      return Short.compare((short) value1, (short) value2);
+    } else if (value1 instanceof String) {
+      return ((String) value1).compareTo((String) value2);
+    }
+    throw new RuntimeException("Unsupported Data type: " + value1.getClass().getName());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
new file mode 100644
index 0000000..a73d9a6
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.partitioners.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+
+public abstract class SparkDataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelper.class);
+
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+  public SparkDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction,
+            partitionColumnDefaultNullValue, sortingColumn, sortingColumnType, sortingColumnDefaultNullValue,
+            numOutputFiles, maxNumRecordsPerFile);
+  }
+
+  public void setUpAndExecuteJob(SparkSession sparkSession) {
+    // Read data into data frame.
+    Dataset<Row> dataFrame = sparkSession.read().format(getDataFormat())
+        .load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths));
+    JavaRDD<Row> javaRDD = dataFrame.javaRDD();
+
+    // Find positions of partition column and sorting column if specified.
+    StructType schema = dataFrame.schema();
+    StructField[] fields = schema.fields();
+    int partitionColumnPosition = -1;
+    int sortingColumnPosition = -1;
+    PartitionFunction partitionFunction = null;
+    String partitionColumnDefaultNullValue = null;
+    for (int i = 0; i <= fields.length; i++) {
+      StructField field = fields[i];
+      if (_dataPreprocessingHelper._partitionColumn != null && _dataPreprocessingHelper._partitionColumn
+          .equalsIgnoreCase(field.name())) {
+        partitionColumnPosition = i;
+        partitionFunction = PartitionFunctionFactory
+            .getPartitionFunction(_dataPreprocessingHelper._partitionFunction, _dataPreprocessingHelper._numPartitions);
+        partitionColumnDefaultNullValue =
+            _dataPreprocessingHelper._pinotTableSchema.getFieldSpecFor(_dataPreprocessingHelper._partitionColumn)
+                .getDefaultNullValueString();
+      }
+      if (_dataPreprocessingHelper._sortingColumn != null && _dataPreprocessingHelper._sortingColumn
+          .equalsIgnoreCase(field.name())) {
+        sortingColumnPosition = i;
+      }
+    }
+    int numPartitions;
+    if (partitionColumnPosition == -1) {
+      if (_dataPreprocessingHelper._numOutputFiles > 0) {
+        numPartitions = _dataPreprocessingHelper._numOutputFiles;
+      } else {
+        numPartitions = javaRDD.getNumPartitions();
+      }
+    } else {
+      numPartitions = _dataPreprocessingHelper._numPartitions;
+    }
+    final String finalPartitionColumn = _dataPreprocessingHelper._partitionColumn;
+    final int finalNumPartitions = numPartitions;
+    final int finalPartitionColumnPosition = partitionColumnPosition;
+    final int finalSortingColumnPosition = sortingColumnPosition;
+    LOGGER.info("Partition column: " + finalPartitionColumn);
+    LOGGER.info("Number of partitions: " + finalNumPartitions);
+    LOGGER.info("Position of partition column (if specified): " + finalPartitionColumnPosition);
+    LOGGER.info("Position of sorting column (if specified): " + finalSortingColumnPosition);
+    LOGGER.info("Default null value for partition column: " + partitionColumnDefaultNullValue);
+    SparkDataPreprocessingPartitioner sparkPartitioner =
+        new SparkDataPreprocessingPartitioner(finalPartitionColumn, finalNumPartitions, partitionFunction,
+            partitionColumnDefaultNullValue);
+
+    // Convert to java pair rdd.
+    JavaPairRDD<Object, Row> pairRDD = javaRDD.mapToPair((PairFunction<Row, Object, Row>) row -> {
+      Object partitionColumnValue = null;
+      Object sortingColumnValue = null;
+
+      if (_dataPreprocessingHelper._partitionColumn != null) {
+        partitionColumnValue = row.get(finalPartitionColumnPosition);
+      }
+      int partitionId = sparkPartitioner.generatePartitionId(partitionColumnValue);
+      if (_dataPreprocessingHelper._sortingColumn != null) {
+        sortingColumnValue = row.get(finalSortingColumnPosition);
+      }
+      return new Tuple2<>(new SparkDataPreprocessingJobKey(partitionId, sortingColumnValue), row);
+    });
+
+    // Repartition and sort within partitions.
+    Comparator<Object> comparator = new SparkDataPreprocessingComparator();
+    JavaPairRDD<Object, Row> partitionedSortedPairRDD =
+        pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, comparator);
+
+    // TODO: support preprocessing.max.num.records.per.file before writing back to storage
+    // Write to output path.
+    partitionedSortedPairRDD.values().saveAsTextFile(_dataPreprocessingHelper._outputPath.toString());
+  }
+
+  private Seq<String> convertPathsToStrings(List<Path> paths) {
+    List<String> stringList = new ArrayList<>();
+    for (Path path : paths) {
+      stringList.add(path.toString());
+    }
+    return toSeq(stringList);
+  }
+
+  /**
+   * Helper to wrap a Java collection into a Scala Seq
+   *
+   * @param collection java collection
+   * @param <T> collection item type
+   * @return Scala Seq of type T
+   */
+  public static <T> Seq<T> toSeq(Collection<T> collection) {
+    return JavaConverters.asScalaBufferConverter(new ArrayList<>(collection)).asScala().toSeq();
+  }
+
+  public abstract String getDataFormat();
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
index 43ee97b..cb64bc6 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
@@ -16,44 +16,45 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DataPreprocessingHelperFactory {
-  private DataPreprocessingHelperFactory() {
+public class SparkDataPreprocessingHelperFactory {
+  private SparkDataPreprocessingHelperFactory() {
   }
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class);
 
-  public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+  public static SparkDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
       throws IOException {
     final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
     final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
 
     int numAvroFiles = avroFiles.size();
     int numOrcFiles = orcFiles.size();
-    Preconditions
-        .checkState(numAvroFiles == 0 || numOrcFiles == 0,
-            "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
-            inputPaths);
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+        inputPaths);
     Preconditions
         .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
             inputPaths);
 
     if (numAvroFiles > 0) {
       LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
-      return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+      return new SparkAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath));
     } else {
       LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
-      return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+      return new SparkOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath));
     }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
new file mode 100644
index 0000000..d4e76a2
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+public class SparkDataPreprocessingJobKey {
+  private final Object _partitionId;
+  private final Object _sortedColumn;
+
+  public SparkDataPreprocessingJobKey(Object partitionId, Object sortedColumn) {
+    _partitionId = partitionId;
+    _sortedColumn = sortedColumn;
+  }
+
+  public Object getPartitionId() {
+    return _partitionId;
+  }
+
+  public Object getSortedColumn() {
+    return _sortedColumn;
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..c7f56ab
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.spark.Partitioner;
+
+
+public class SparkDataPreprocessingPartitioner extends Partitioner {
+  private final String _partitionColumn;
+  private final int _numPartitions;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumnDefaultNullValue;
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
+  public SparkDataPreprocessingPartitioner(String partitionColumn, int numPartitions,
+      PartitionFunction partitionFunction, String partitionColumnDefaultNullValue) {
+    _partitionColumn = partitionColumn;
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
+  }
+
+  @Override
+  public int numPartitions() {
+    return _numPartitions;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key;
+    return (int) jobKey.getPartitionId();
+  }
+
+  public int generatePartitionId(Object key) {
+    if (_partitionColumn == null) {
+      // Need to distribute evenly for data with the default partition key value.
+      // We may want to partition and sort on a non-primary key.
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    }
+    Object keyToPartition = _partitionColumnDefaultNullValue;
+    if (key != null) {
+      keyToPartition = key;
+    }
+    return _partitionFunction.getPartition(keyToPartition);
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..3f84aad
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+
+
+public class SparkOrcDataPreprocessingHelper extends SparkDataPreprocessingHelper {
+  public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public String getDataFormat() {
+    return "orc";
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
index 0596259..8ee89cf 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.utils;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;