Support data preprocessing in Spark framework
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 85f5c6c..a4b0086 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -18,33 +18,19 @@
*/
package org.apache.pinot.hadoop.job;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory;
import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
import org.apache.pinot.ingestion.common.ControllerRestApi;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,23 +46,6 @@
public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
- private String _partitionColumn;
- private int _numPartitions;
- private String _partitionFunction;
- private String _partitionColumnDefaultNullValue;
-
- private String _sortingColumn;
- private FieldSpec.DataType _sortingColumnType;
- private String _sortingColumnDefaultNullValue;
-
- private int _numOutputFiles;
- private int _maxNumRecordsPerFile;
-
- private TableConfig _tableConfig;
- private org.apache.pinot.spi.data.Schema _pinotTableSchema;
-
- private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
-
public HadoopSegmentPreprocessingJob(final Properties properties) {
super(properties);
}
@@ -99,8 +68,8 @@
// Cleans up preprocessed output dir if exists
cleanUpPreprocessedOutputs(_preprocessedOutputDir);
- DataPreprocessingHelper dataPreprocessingHelper =
- DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+ HadoopDataPreprocessingHelper dataPreprocessingHelper =
+ HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
dataPreprocessingHelper
.registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
_partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
@@ -130,130 +99,6 @@
LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
}
- private void fetchPreProcessingOperations() {
- _preprocessingOperations = new HashSet<>();
- TableCustomConfig customConfig = _tableConfig.getCustomConfig();
- if (customConfig != null) {
- Map<String, String> customConfigMap = customConfig.getCustomConfigs();
- if (customConfigMap != null && !customConfigMap.isEmpty()) {
- String preprocessingOperationsString =
- customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
- DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
- }
- }
- }
-
- private void fetchPartitioningConfig() {
- // Fetch partition info from table config.
- if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
- LOGGER.info("Partitioning is disabled.");
- return;
- }
- SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (segmentPartitionConfig != null) {
- Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
- Preconditions
- .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
- if (columnPartitionMap.size() == 1) {
- _partitionColumn = columnPartitionMap.keySet().iterator().next();
- _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
- _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
- _partitionColumnDefaultNullValue =
- _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
- }
- } else {
- LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
- }
- }
-
- private void fetchSortingConfig() {
- if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
- LOGGER.info("Sorting is disabled.");
- return;
- }
- // Fetch sorting info from table config first.
- List<String> sortingColumns = new ArrayList<>();
- List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
- if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
- for (FieldConfig fieldConfig : fieldConfigs) {
- if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
- sortingColumns.add(fieldConfig.getName());
- }
- }
- }
- if (!sortingColumns.isEmpty()) {
- Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table.");
- _sortingColumn = sortingColumns.get(0);
- return;
- }
-
- // There is no sorted column specified in field configs, try to find sorted column from indexing config.
- IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
- List<String> sortedColumns = indexingConfig.getSortedColumn();
- if (sortedColumns != null) {
- Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
- if (sortedColumns.size() == 1) {
- _sortingColumn = sortedColumns.get(0);
- FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
- Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
- Preconditions
- .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
- _sortingColumnType = fieldSpec.getDataType();
- Preconditions
- .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
- _sortingColumn);
- LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
- }
- }
- if (_sortingColumn != null) {
- _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
- }
- }
-
- private void fetchResizingConfig() {
- if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
- LOGGER.info("Resizing is disabled.");
- return;
- }
- TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
- if (tableCustomConfig == null) {
- _numOutputFiles = 0;
- return;
- }
- Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
- if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
- _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
- Preconditions.checkState(_numOutputFiles > 0,
- String.format("The value of %s should be positive! Current value: %s",
- InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
- } else {
- _numOutputFiles = 0;
- }
-
- if (customConfigsMap != null) {
- int maxNumRecords;
- if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
- LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
- InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
- InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
- maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
- } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
- maxNumRecords =
- Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
- } else {
- return;
- }
- // TODO: add a in-built maximum value for this config to avoid having too many small files.
- // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from
- // generating too many small files.
- Preconditions.checkArgument(maxNumRecords > 0,
- "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
- + " should be positive. Current value: " + maxNumRecords);
- LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
- _maxNumRecordsPerFile = maxNumRecords;
- }
- }
-
@Override
protected Schema getSchema()
throws IOException {
@@ -275,15 +120,6 @@
protected void addAdditionalJobProperties(Job job) {
}
- private void setTableConfigAndSchema()
- throws IOException {
- _tableConfig = getTableConfig();
- _pinotTableSchema = getSchema();
-
- Preconditions.checkState(_tableConfig != null, "Table config cannot be null.");
- Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
- }
-
/**
* Cleans up outputs in preprocessed output directory.
*/
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 4291790..ac8e2fd 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -35,9 +35,9 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
deleted file mode 100644
index dc51e71..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.job.preprocess;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
-import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class DataPreprocessingHelper {
- private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
-
- String _partitionColumn;
- int _numPartitions;
- String _partitionFunction;
- String _partitionColumnDefaultNullValue;
-
- String _sortingColumn;
- private FieldSpec.DataType _sortingColumnType;
- String _sortingColumnDefaultNullValue;
-
- private int _numOutputFiles;
- private int _maxNumRecordsPerFile;
-
- private TableConfig _tableConfig;
- private Schema _pinotTableSchema;
-
- List<Path> _inputDataPaths;
- Path _sampleRawDataPath;
- Path _outputPath;
-
- public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
- _inputDataPaths = inputDataPaths;
- _sampleRawDataPath = inputDataPaths.get(0);
- _outputPath = outputPath;
- }
-
- public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
- String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
- FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
- int maxNumRecordsPerFile) {
- _tableConfig = tableConfig;
- _pinotTableSchema = tableSchema;
- _partitionColumn = partitionColumn;
- _numPartitions = numPartitions;
- _partitionFunction = partitionFunction;
- _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
-
- _sortingColumn = sortingColumn;
- _sortingColumnType = sortingColumnType;
- _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
-
- _numOutputFiles = numOutputFiles;
- _maxNumRecordsPerFile = maxNumRecordsPerFile;
- }
-
- public Job setUpJob()
- throws IOException {
- LOGGER.info("Initializing a pre-processing job");
- Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
- Configuration jobConf = job.getConfiguration();
- // Input and output paths.
- int numInputPaths = _inputDataPaths.size();
- jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
- setValidationConfigs(job, _sampleRawDataPath);
- for (Path inputFile : _inputDataPaths) {
- FileInputFormat.addInputPath(job, inputFile);
- }
- setHadoopJobConfigs(job);
-
- // Sorting column
- if (_sortingColumn != null) {
- LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
- jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
- jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
- jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
-
- switch (_sortingColumnType) {
- case INT:
- job.setMapOutputKeyClass(IntWritable.class);
- break;
- case LONG:
- job.setMapOutputKeyClass(LongWritable.class);
- break;
- case FLOAT:
- job.setMapOutputKeyClass(FloatWritable.class);
- break;
- case DOUBLE:
- job.setMapOutputKeyClass(DoubleWritable.class);
- break;
- case STRING:
- job.setMapOutputKeyClass(Text.class);
- job.setSortComparatorClass(TextComparator.class);
- break;
- default:
- throw new IllegalStateException();
- }
- } else {
- job.setMapOutputKeyClass(NullWritable.class);
- }
-
- // Partition column
- int numReduceTasks = 0;
- if (_partitionColumn != null) {
- numReduceTasks = _numPartitions;
- jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
- job.setPartitionerClass(GenericPartitioner.class);
- jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
- if (_partitionFunction != null) {
- jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
- }
- jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
- jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
- } else {
- if (_numOutputFiles > 0) {
- numReduceTasks = _numOutputFiles;
- } else {
- // default number of input paths
- numReduceTasks = _inputDataPaths.size();
- }
- }
- job.setPartitionerClass(getPartitioner());
- // Maximum number of records per output file
- jobConf
- .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
- // Number of reducers
- LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
- job.setNumReduceTasks(numReduceTasks);
-
- setUpMapperReducerConfigs(job);
-
- return job;
- }
-
- abstract Class<? extends Partitioner> getPartitioner();
-
- abstract void setUpMapperReducerConfigs(Job job)
- throws IOException;
-
- abstract String getSampleTimeColumnValue(String timeColumnName)
- throws IOException;
-
- private void setValidationConfigs(Job job, Path path)
- throws IOException {
- SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-
- // TODO: Serialize and deserialize validation config by creating toJson and fromJson
- // If the use case is an append use case, check that one time unit is contained in one file. If there is more
- // than one,
- // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
- // and value
- if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
- job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
- String timeColumnName = validationConfig.getTimeColumnName();
- job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
- if (timeColumnName != null) {
- DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
- if (dateTimeFieldSpec != null) {
- DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
- job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
- String sdfPattern = formatSpec.getSDFPattern();
- if (sdfPattern != null) {
- job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
- }
- }
- }
- job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
- IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
-
- String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
- if (sampleTimeColumnValue != null) {
- job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
- }
- }
- }
-
- private void setHadoopJobConfigs(Job job) {
- job.setJarByClass(HadoopSegmentPreprocessingJob.class);
- job.setJobName(getClass().getName());
- FileOutputFormat.setOutputPath(job, _outputPath);
- job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
- // Turn this on to always firstly use class paths that user specifies.
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
- // Turn this off since we don't need an empty file in the output directory
- job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
- String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (hadoopTokenFileLocation != null) {
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
- }
- }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..adaef88
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopAvroDataPreprocessingHelper extends HadoopDataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class);
+
+ public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ super(dataPreprocessingHelper);
+ }
+
+ @Override
+ public void setUpMapperReducerConfigs(Job job)
+ throws IOException {
+ Schema avroSchema = (Schema) getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+ LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+ validateConfigsAgainstSchema(avroSchema);
+
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+ job.setReducerClass(AvroDataPreprocessingReducer.class);
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroJob.setInputKeySchema(job, avroSchema);
+ AvroJob.setMapOutputValueSchema(job, avroSchema);
+ AvroJob.setOutputKeySchema(job, avroSchema);
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
new file mode 100644
index 0000000..bf89c62
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.utils.preprocess.TextComparator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class HadoopDataPreprocessingHelper implements HadoopJobPreparer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class);
+
+ protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+ public HadoopDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ _dataPreprocessingHelper = dataPreprocessingHelper;
+ }
+
+ public Job setUpJob()
+ throws IOException {
+ LOGGER.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+ Configuration jobConf = job.getConfiguration();
+ // Input and output paths.
+ int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size();
+ jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+ _dataPreprocessingHelper.setValidationConfigs(job, _dataPreprocessingHelper._sampleRawDataPath);
+ for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) {
+ FileInputFormat.addInputPath(job, inputFile);
+ }
+ setHadoopJobConfigs(job);
+
+ // Sorting column
+ if (_dataPreprocessingHelper._sortingColumn != null) {
+ LOGGER.info("Adding sorting column: {} to job config", _dataPreprocessingHelper._sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _dataPreprocessingHelper._sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _dataPreprocessingHelper._sortingColumnType.name());
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE,
+ _dataPreprocessingHelper._sortingColumnDefaultNullValue);
+
+ switch (_dataPreprocessingHelper._sortingColumnType) {
+ case INT:
+ job.setMapOutputKeyClass(IntWritable.class);
+ break;
+ case LONG:
+ job.setMapOutputKeyClass(LongWritable.class);
+ break;
+ case FLOAT:
+ job.setMapOutputKeyClass(FloatWritable.class);
+ break;
+ case DOUBLE:
+ job.setMapOutputKeyClass(DoubleWritable.class);
+ break;
+ case STRING:
+ job.setMapOutputKeyClass(Text.class);
+ job.setSortComparatorClass(TextComparator.class);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ } else {
+ job.setMapOutputKeyClass(NullWritable.class);
+ }
+
+ // Partition column
+ int numReduceTasks = 0;
+ if (_dataPreprocessingHelper._partitionColumn != null) {
+ numReduceTasks = _dataPreprocessingHelper._numPartitions;
+ jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+ job.setPartitionerClass(GenericPartitioner.class);
+ jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _dataPreprocessingHelper._partitionColumn);
+ if (_dataPreprocessingHelper._partitionFunction != null) {
+ jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _dataPreprocessingHelper._partitionFunction);
+ }
+ jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE,
+ _dataPreprocessingHelper._partitionColumnDefaultNullValue);
+ jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+ } else {
+ if (_dataPreprocessingHelper._numOutputFiles > 0) {
+ numReduceTasks = _dataPreprocessingHelper._numOutputFiles;
+ } else {
+ // default number of input paths
+ numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size();
+ }
+ }
+ job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner());
+ // Maximum number of records per output file
+ jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE,
+ Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile));
+ // Number of reducers
+ LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
+
+ setUpMapperReducerConfigs(job);
+
+ return job;
+ }
+
+ private void setHadoopJobConfigs(Job job) {
+ job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+ job.setJobName(getClass().getName());
+ FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath);
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+ // Turn this off since we don't need an empty file in the output directory
+ job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+ String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+ }
+ }
+
+ public Object getSchema(Path inputPathDir)
+ throws IOException {
+ return _dataPreprocessingHelper.getSchema(inputPathDir);
+ }
+
+ public void validateConfigsAgainstSchema(Object schema) {
+ _dataPreprocessingHelper.validateConfigsAgainstSchema(schema);
+ }
+
+ public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+ String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+ FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+ int maxNumRecordsPerFile) {
+ _dataPreprocessingHelper
+ .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction,
+ partitionColumnDefaultNullValue,
+ sortingColumn, sortingColumnType, sortingColumnDefaultNullValue, numOutputFiles, maxNumRecordsPerFile);
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
similarity index 65%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
index 43ee97b..c8dec82 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
@@ -22,38 +22,39 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DataPreprocessingHelperFactory {
- private DataPreprocessingHelperFactory() {
+public class HadoopDataPreprocessingHelperFactory {
+ private HadoopDataPreprocessingHelperFactory() {
}
- private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class);
- public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+ public static HadoopDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
throws IOException {
final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
int numAvroFiles = avroFiles.size();
int numOrcFiles = orcFiles.size();
- Preconditions
- .checkState(numAvroFiles == 0 || numOrcFiles == 0,
- "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
- inputPaths);
+ Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+ "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+ inputPaths);
Preconditions
.checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
inputPaths);
if (numAvroFiles > 0) {
LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
- return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+ return new HadoopAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath));
} else {
LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
- return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+ return new HadoopOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath));
}
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
new file mode 100644
index 0000000..c4834cf
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.Job;
+
+
+public interface HadoopJobPreparer {
+
+ void setUpMapperReducerConfigs(Job job) throws IOException;
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..0d9ee78
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopOrcDataPreprocessingHelper extends HadoopDataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class);
+
+ public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ super(dataPreprocessingHelper);
+ }
+
+ @Override
+ public void setUpMapperReducerConfigs(Job job)
+ throws IOException {
+ Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+ String orcSchemaString = orcSchema.toString();
+ LOGGER.info("Orc schema is: {}", orcSchemaString);
+ validateConfigsAgainstSchema(orcSchema);
+
+ job.setInputFormatClass(OrcInputFormat.class);
+ job.setMapperClass(OrcDataPreprocessingMapper.class);
+ job.setMapOutputValueClass(OrcValue.class);
+ Configuration jobConf = job.getConfiguration();
+ OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+ job.setReducerClass(OrcDataPreprocessingReducer.class);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcStruct.class);
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
similarity index 78%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
index 2f97f72..5f74646 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java
@@ -19,13 +19,13 @@
package org.apache.pinot.hadoop.job.preprocess;
import com.google.common.base.Preconditions;
+import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -34,6 +34,8 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -41,18 +43,34 @@
import static org.testng.Assert.assertNull;
-public class DataPreprocessingHelperTest {
+public class HadoopDataPreprocessingHelperTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "HadoopDataPreprocessingHelperTest");
+
+ @BeforeClass
+ public static void setUp()
+ throws IOException {
+ String pathString = Preconditions
+ .checkNotNull(
+ HadoopDataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
+ .getPath();
+
+ // Copy the input path to a temp directory.
+ FileUtils.deleteQuietly(TEMP_DIR);
+ FileUtils.forceMkdir(TEMP_DIR);
+ FileUtils.copyFileToDirectory(new File(pathString), TEMP_DIR);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
@Test
public void testDataPreprocessingHelper()
throws IOException {
- List<Path> inputPaths = new ArrayList<>();
- String pathString = Preconditions
- .checkNotNull(DataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
- .getPath();
- inputPaths.add(new Path(pathString));
Path outputPath = new Path("mockOutputPath");
- DataPreprocessingHelper dataPreprocessingHelper = new AvroDataPreprocessingHelper(inputPaths, outputPath);
+ HadoopDataPreprocessingHelper dataPreprocessingHelper =
+ HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(new Path(TEMP_DIR.toString()), outputPath);
BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, "APPEND", "DAILY");
IngestionConfig ingestionConfig = new IngestionConfig(batchIngestionConfig, null, null, null, null);
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 965a98a..3d7b489 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -117,5 +117,14 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 70c71e3..df6d462 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -21,11 +21,25 @@
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pinot.ingestion.common.ControllerRestApi;
import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +53,7 @@
* * enable.preprocessing: false by default. Enables preprocessing job.
*/
public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
- private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
protected final Path _schemaFile;
protected final Path _inputSegmentDir;
protected final Path _preprocessedOutputDir;
@@ -47,6 +61,23 @@
protected final Path _pathToDependencyJar;
protected boolean _enablePreprocessing;
+ protected String _partitionColumn;
+ protected int _numPartitions;
+ protected String _partitionFunction;
+ protected String _partitionColumnDefaultNullValue;
+
+ protected String _sortingColumn;
+ protected FieldSpec.DataType _sortingColumnType;
+ protected String _sortingColumnDefaultNullValue;
+
+ protected int _numOutputFiles;
+ protected int _maxNumRecordsPerFile;
+
+ protected TableConfig _tableConfig;
+ protected org.apache.pinot.spi.data.Schema _pinotTableSchema;
+
+ protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
public SegmentPreprocessingJob(final Properties properties) {
super(properties);
@@ -60,13 +91,13 @@
_pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
_schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
- _logger.info("*********************************************************************");
- _logger.info("enable.preprocessing: {}", _enablePreprocessing);
- _logger.info("path.to.input: {}", _inputSegmentDir);
- _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
- _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
- _logger.info("push.locations: {}", _pushLocations);
- _logger.info("*********************************************************************");
+ LOGGER.info("*********************************************************************");
+ LOGGER.info("enable.preprocessing: {}", _enablePreprocessing);
+ LOGGER.info("path.to.input: {}", _inputSegmentDir);
+ LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+ LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar);
+ LOGGER.info("push.locations: {}", _pushLocations);
+ LOGGER.info("*********************************************************************");
}
protected abstract void run()
@@ -91,4 +122,137 @@
// TODO: support orc format in the future.
return fileName.endsWith(".avro");
}
+
+ protected void setTableConfigAndSchema()
+ throws IOException {
+ _tableConfig = getTableConfig();
+ _pinotTableSchema = getSchema();
+
+ Preconditions.checkState(_tableConfig != null, "Table config cannot be null.");
+ Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
+ }
+
+ protected void fetchPreProcessingOperations() {
+ _preprocessingOperations = new HashSet<>();
+ TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+ if (customConfig != null) {
+ Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+ if (customConfigMap != null && !customConfigMap.isEmpty()) {
+ String preprocessingOperationsString =
+ customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+ DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
+ }
+ }
+ }
+
+ protected void fetchPartitioningConfig() {
+ // Fetch partition info from table config.
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
+ LOGGER.info("Partitioning is disabled.");
+ return;
+ }
+ SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+ Preconditions
+ .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
+ if (columnPartitionMap.size() == 1) {
+ _partitionColumn = columnPartitionMap.keySet().iterator().next();
+ _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
+ _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+ _partitionColumnDefaultNullValue =
+ _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
+ }
+ } else {
+ LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+ }
+ }
+
+ protected void fetchSortingConfig() {
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+ LOGGER.info("Sorting is disabled.");
+ return;
+ }
+ // Fetch sorting info from table config first.
+ List<String> sortingColumns = new ArrayList<>();
+ List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+ if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+ for (FieldConfig fieldConfig : fieldConfigs) {
+ if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+ sortingColumns.add(fieldConfig.getName());
+ }
+ }
+ }
+ if (!sortingColumns.isEmpty()) {
+ Preconditions.checkArgument(sortingColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+ _sortingColumn = sortingColumns.get(0);
+ return;
+ }
+
+ // There is no sorted column specified in field configs, try to find sorted column from indexing config.
+ IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+ List<String> sortedColumns = indexingConfig.getSortedColumn();
+ if (sortedColumns != null) {
+ Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+ if (sortedColumns.size() == 1) {
+ _sortingColumn = sortedColumns.get(0);
+ FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+ Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+ Preconditions
+ .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+ _sortingColumnType = fieldSpec.getDataType();
+ Preconditions
+ .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
+ _sortingColumn);
+ LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
+ }
+ }
+ if (_sortingColumn != null) {
+ _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+ }
+ }
+
+ protected void fetchResizingConfig() {
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+ LOGGER.info("Resizing is disabled.");
+ return;
+ }
+ TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+ if (tableCustomConfig == null) {
+ _numOutputFiles = 0;
+ return;
+ }
+ Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+ if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
+ _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
+ Preconditions.checkState(_numOutputFiles > 0, String
+ .format("The value of %s should be positive! Current value: %s",
+ InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
+ } else {
+ _numOutputFiles = 0;
+ }
+
+ if (customConfigsMap != null) {
+ int maxNumRecords;
+ if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
+ LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
+ InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+ InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+ maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+ } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
+ maxNumRecords =
+ Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+ } else {
+ return;
+ }
+ // TODO: add a in-built maximum value for this config to avoid having too many small files.
+ // E.g. if the config is set to 1 which is smaller than this in-built value,
+ // the job should be abort from generating too many small files.
+ Preconditions.checkArgument(maxNumRecords > 0,
+ "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
+ + " should be positive. Current value: " + maxNumRecords);
+ LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+ _maxNumRecordsPerFile = maxNumRecords;
+ }
+ }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
similarity index 73%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
index 9e5f5f2..3be4768 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -26,23 +26,13 @@
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,30 +51,19 @@
}
@Override
- public void setUpMapperReducerConfigs(Job job)
+ public Object getSchema(Path inputPathDir)
throws IOException {
- Schema avroSchema = getAvroSchema(_sampleRawDataPath);
- LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
- validateConfigsAgainstSchema(avroSchema);
-
- job.setInputFormatClass(AvroKeyInputFormat.class);
- job.setMapperClass(AvroDataPreprocessingMapper.class);
-
- job.setReducerClass(AvroDataPreprocessingReducer.class);
- AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
- AvroMultipleOutputs.setCountersEnabled(job, true);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
-
- AvroJob.setInputKeySchema(job, avroSchema);
- AvroJob.setMapOutputValueSchema(job, avroSchema);
- AvroJob.setOutputKeySchema(job, avroSchema);
+ return getAvroSchema(inputPathDir);
}
@Override
- String getSampleTimeColumnValue(String timeColumnName)
+ public void validateConfigsAgainstSchema(Object schema) {
+ Schema avroSchema = (Schema) schema;
+ validateConfigsAgainstSchema(avroSchema);
+ }
+
+ @Override
+ public String getSampleTimeColumnValue(String timeColumnName)
throws IOException {
String sampleTimeColumnValue;
try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) {
@@ -99,7 +78,7 @@
* @return Input schema
* @throws IOException exception when accessing to IO
*/
- private Schema getAvroSchema(Path inputPathDir)
+ protected Schema getAvroSchema(Path inputPathDir)
throws IOException {
Schema avroSchema = null;
for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..6ef165f
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper implements SampleTimeColumnExtractable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+ public String _partitionColumn;
+ public int _numPartitions;
+ public String _partitionFunction;
+ public String _partitionColumnDefaultNullValue;
+
+ public String _sortingColumn;
+ public FieldSpec.DataType _sortingColumnType;
+ public String _sortingColumnDefaultNullValue;
+
+ public int _numOutputFiles;
+ public int _maxNumRecordsPerFile;
+
+ public TableConfig _tableConfig;
+ public Schema _pinotTableSchema;
+
+ public List<Path> _inputDataPaths;
+ public Path _sampleRawDataPath;
+ public Path _outputPath;
+
+ public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+ _inputDataPaths = inputDataPaths;
+ _sampleRawDataPath = inputDataPaths.get(0);
+ _outputPath = outputPath;
+ }
+
+ public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+ String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+ FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+ int maxNumRecordsPerFile) {
+ _tableConfig = tableConfig;
+ _pinotTableSchema = tableSchema;
+ _partitionColumn = partitionColumn;
+ _numPartitions = numPartitions;
+ _partitionFunction = partitionFunction;
+ _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
+
+ _sortingColumn = sortingColumn;
+ _sortingColumnType = sortingColumnType;
+ _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
+
+ _numOutputFiles = numOutputFiles;
+ _maxNumRecordsPerFile = maxNumRecordsPerFile;
+ }
+
+ public abstract Class<? extends Partitioner> getPartitioner();
+
+ abstract public Object getSchema(Path inputPathDir)
+ throws IOException;
+
+ abstract public void validateConfigsAgainstSchema(Object schema);
+
+ public void setValidationConfigs(Job job, Path path)
+ throws IOException {
+ SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+ // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+ // If the use case is an append use case, check that one time unit is contained in one file.
+ // If there is more than one, the job should be disabled, as we should not resize for these use cases.
+ // Therefore, setting the time column name and value.
+ if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
+ job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+ String timeColumnName = validationConfig.getTimeColumnName();
+ job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+ job.getConfiguration()
+ .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
+ String sdfPattern = formatSpec.getSDFPattern();
+ if (sdfPattern != null) {
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+ }
+ }
+ }
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+ IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+ String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+ if (sampleTimeColumnValue != null) {
+ job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
+ }
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
index 43ee97b..c89e30d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
similarity index 82%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
index 24c0473..4371cda 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -30,23 +29,13 @@
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcValue;
-import org.apache.orc.mapreduce.OrcInputFormat;
-import org.apache.orc.mapreduce.OrcOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,33 +51,24 @@
}
@Override
- Class<? extends Partitioner> getPartitioner() {
+ public Class<? extends Partitioner> getPartitioner() {
return OrcDataPreprocessingPartitioner.class;
}
@Override
- void setUpMapperReducerConfigs(Job job) {
- TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
- String orcSchemaString = orcSchema.toString();
- LOGGER.info("Orc schema is: {}", orcSchemaString);
- validateConfigsAgainstSchema(orcSchema);
-
- job.setInputFormatClass(OrcInputFormat.class);
- job.setMapperClass(OrcDataPreprocessingMapper.class);
- job.setMapOutputValueClass(OrcValue.class);
- Configuration jobConf = job.getConfiguration();
- OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
-
- job.setReducerClass(OrcDataPreprocessingReducer.class);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(OrcStruct.class);
- OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+ public Object getSchema(Path inputPathDir)
+ throws IOException {
+ return getOrcSchema(inputPathDir);
}
@Override
- String getSampleTimeColumnValue(String timeColumnName)
+ public void validateConfigsAgainstSchema(Object schema) {
+ TypeDescription orcSchema = (TypeDescription) schema;
+ validateConfigsAgainstSchema(orcSchema);
+ }
+
+ @Override
+ public String getSampleTimeColumnValue(String timeColumnName)
throws IOException {
try (Reader reader = OrcFile
.createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
new file mode 100644
index 0000000..feaceee
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+
+
+public interface SampleTimeColumnExtractable {
+
+ String getSampleTimeColumnValue(String timeColumnName)
+ throws IOException;
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
index d9f17ac..0e5e33b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -27,8 +27,8 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.FieldSpec;
import org.slf4j.Logger;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
similarity index 93%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
index 8ad9d84..9180286 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -27,9 +27,9 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
index 5ae8966..3ddd255 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -30,8 +30,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
index 5f2ae4d..a19d24c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +27,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.slf4j.Logger;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
index 118d310..0a066c2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
similarity index 95%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
index 853502c..cd36bdf 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
import com.google.common.base.Preconditions;
import java.util.List;
@@ -28,8 +28,8 @@
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
index 0ba57db..1826bdc 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
import java.util.HashMap;
import java.util.Map;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
index 62ed4a3..f9a046c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
import java.io.IOException;
import org.apache.avro.generic.GenericRecord;
@@ -27,7 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
index a3387a2..10d3f10 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
import java.io.IOException;
import org.apache.commons.lang3.RandomStringUtils;
@@ -27,7 +27,7 @@
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
similarity index 98%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
index 1998399..0e60d2d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils;
import java.util.Set;
import org.apache.hadoop.io.DoubleWritable;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
similarity index 98%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
index b26fc38..b8fa59a 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job;
+package org.apache.pinot.ingestion.utils;
/**
* Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
index 58e1c1d..a6e4ca7 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
import com.google.common.base.Preconditions;
import java.io.IOException;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
similarity index 95%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
index 0596259..e4cdf5e 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
similarity index 97%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
index dcfc3b5..09eb218 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.io.BooleanWritable;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
index 4def5bd..9e52957 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
index 6427501..20a5734 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
@@ -106,6 +106,10 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -282,6 +286,10 @@
</exclusions>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </dependency>
<!--Test-->
<dependency>
@@ -379,6 +387,14 @@
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
</exclusions>
<scope>test</scope>
</dependency>
@@ -463,8 +479,16 @@
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </exclusion>
</exclusions>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -480,7 +504,7 @@
<artifactId>objenesis</artifactId>
</exclusion>
</exclusions>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
new file mode 100644
index 0000000..4743d1f
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory;
+import org.apache.pinot.spark.utils.HadoopUtils;
+import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Spark job which provides partitioning, sorting, and resizing against the input files,
+ * which is raw data in either Avro or Orc format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job properties:
+ * * enable.preprocessing: false by default. Enables preprocessing job.
+ */
+public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class);
+
+ public SparkSegmentPreprocessingJob(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ protected void run()
+ throws Exception {
+ if (!_enablePreprocessing) {
+ LOGGER.info("Pre-processing job is disabled.");
+ return;
+ } else {
+ LOGGER.info("Starting {}", getClass().getSimpleName());
+ }
+
+ setTableConfigAndSchema();
+ fetchPreProcessingOperations();
+ fetchPartitioningConfig();
+ fetchSortingConfig();
+ fetchResizingConfig();
+
+ // Cleans up preprocessed output dir if exists
+ cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+ SparkDataPreprocessingHelper dataPreprocessingHelper =
+ SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+ dataPreprocessingHelper
+ .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
+ _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
+ _numOutputFiles, _maxNumRecordsPerFile);
+
+ // Set up and execute spark job.
+ JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ addDepsJarToDistributedCache(javaSparkContext);
+
+ SparkSession sparkSession =
+ SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate();
+
+ dataPreprocessingHelper.setUpAndExecuteJob(sparkSession);
+ }
+
+ /**
+ * Cleans up outputs in preprocessed output directory.
+ */
+ public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
+ throws IOException {
+ if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+ LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+ HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+ }
+ }
+
+ protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+ throws IOException {
+ if (_pathToDependencyJar != null) {
+ PinotSparkJobPreparationHelper
+ .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, sparkContext, _pathToDependencyJar);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..23f17e8
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+
+
+public class SparkAvroDataPreprocessingHelper extends SparkDataPreprocessingHelper {
+ public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ super(dataPreprocessingHelper);
+ }
+
+ @Override
+ public String getDataFormat() {
+ return "avro";
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
new file mode 100644
index 0000000..7a7a564
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import com.google.common.collect.Ordering;
+import java.io.Serializable;
+
+
+public class SparkDataPreprocessingComparator extends Ordering<Object> implements Serializable {
+ @Override
+ public int compare(Object left, Object right) {
+ Object value1 = ((SparkDataPreprocessingJobKey) left).getSortedColumn();
+ Object value2 = ((SparkDataPreprocessingJobKey) right).getSortedColumn();
+ if (value1 == null) {
+ return 0;
+ }
+ if (value1 instanceof Integer) {
+ return Integer.compare((int) value1, (int) value2);
+ } else if (value1 instanceof Long) {
+ return Long.compare((long) value1, (long) value2);
+ } else if (value1 instanceof Float) {
+ return Float.compare((float) value1, (float) value2);
+ } else if (value1 instanceof Double) {
+ return Double.compare((double) value1, (double) value2);
+ } else if (value1 instanceof Short) {
+ return Short.compare((short) value1, (short) value2);
+ } else if (value1 instanceof String) {
+ return ((String) value1).compareTo((String) value2);
+ }
+ throw new RuntimeException("Unsupported Data type: " + value1.getClass().getName());
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
new file mode 100644
index 0000000..a73d9a6
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.partitioners.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+
+public abstract class SparkDataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelper.class);
+
+ protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+ public SparkDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ _dataPreprocessingHelper = dataPreprocessingHelper;
+ }
+
+ public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+ String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+ FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
+ int maxNumRecordsPerFile) {
+ _dataPreprocessingHelper
+ .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction,
+ partitionColumnDefaultNullValue, sortingColumn, sortingColumnType, sortingColumnDefaultNullValue,
+ numOutputFiles, maxNumRecordsPerFile);
+ }
+
+ public void setUpAndExecuteJob(SparkSession sparkSession) {
+ // Read data into data frame.
+ Dataset<Row> dataFrame = sparkSession.read().format(getDataFormat())
+ .load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths));
+ JavaRDD<Row> javaRDD = dataFrame.javaRDD();
+
+ // Find positions of partition column and sorting column if specified.
+ StructType schema = dataFrame.schema();
+ StructField[] fields = schema.fields();
+ int partitionColumnPosition = -1;
+ int sortingColumnPosition = -1;
+ PartitionFunction partitionFunction = null;
+ String partitionColumnDefaultNullValue = null;
+ for (int i = 0; i <= fields.length; i++) {
+ StructField field = fields[i];
+ if (_dataPreprocessingHelper._partitionColumn != null && _dataPreprocessingHelper._partitionColumn
+ .equalsIgnoreCase(field.name())) {
+ partitionColumnPosition = i;
+ partitionFunction = PartitionFunctionFactory
+ .getPartitionFunction(_dataPreprocessingHelper._partitionFunction, _dataPreprocessingHelper._numPartitions);
+ partitionColumnDefaultNullValue =
+ _dataPreprocessingHelper._pinotTableSchema.getFieldSpecFor(_dataPreprocessingHelper._partitionColumn)
+ .getDefaultNullValueString();
+ }
+ if (_dataPreprocessingHelper._sortingColumn != null && _dataPreprocessingHelper._sortingColumn
+ .equalsIgnoreCase(field.name())) {
+ sortingColumnPosition = i;
+ }
+ }
+ int numPartitions;
+ if (partitionColumnPosition == -1) {
+ if (_dataPreprocessingHelper._numOutputFiles > 0) {
+ numPartitions = _dataPreprocessingHelper._numOutputFiles;
+ } else {
+ numPartitions = javaRDD.getNumPartitions();
+ }
+ } else {
+ numPartitions = _dataPreprocessingHelper._numPartitions;
+ }
+ final String finalPartitionColumn = _dataPreprocessingHelper._partitionColumn;
+ final int finalNumPartitions = numPartitions;
+ final int finalPartitionColumnPosition = partitionColumnPosition;
+ final int finalSortingColumnPosition = sortingColumnPosition;
+ LOGGER.info("Partition column: " + finalPartitionColumn);
+ LOGGER.info("Number of partitions: " + finalNumPartitions);
+ LOGGER.info("Position of partition column (if specified): " + finalPartitionColumnPosition);
+ LOGGER.info("Position of sorting column (if specified): " + finalSortingColumnPosition);
+ LOGGER.info("Default null value for partition column: " + partitionColumnDefaultNullValue);
+ SparkDataPreprocessingPartitioner sparkPartitioner =
+ new SparkDataPreprocessingPartitioner(finalPartitionColumn, finalNumPartitions, partitionFunction,
+ partitionColumnDefaultNullValue);
+
+ // Convert to java pair rdd.
+ JavaPairRDD<Object, Row> pairRDD = javaRDD.mapToPair((PairFunction<Row, Object, Row>) row -> {
+ Object partitionColumnValue = null;
+ Object sortingColumnValue = null;
+
+ if (_dataPreprocessingHelper._partitionColumn != null) {
+ partitionColumnValue = row.get(finalPartitionColumnPosition);
+ }
+ int partitionId = sparkPartitioner.generatePartitionId(partitionColumnValue);
+ if (_dataPreprocessingHelper._sortingColumn != null) {
+ sortingColumnValue = row.get(finalSortingColumnPosition);
+ }
+ return new Tuple2<>(new SparkDataPreprocessingJobKey(partitionId, sortingColumnValue), row);
+ });
+
+ // Repartition and sort within partitions.
+ Comparator<Object> comparator = new SparkDataPreprocessingComparator();
+ JavaPairRDD<Object, Row> partitionedSortedPairRDD =
+ pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, comparator);
+
+ // TODO: support preprocessing.max.num.records.per.file before writing back to storage
+ // Write to output path.
+ partitionedSortedPairRDD.values().saveAsTextFile(_dataPreprocessingHelper._outputPath.toString());
+ }
+
+ private Seq<String> convertPathsToStrings(List<Path> paths) {
+ List<String> stringList = new ArrayList<>();
+ for (Path path : paths) {
+ stringList.add(path.toString());
+ }
+ return toSeq(stringList);
+ }
+
+ /**
+ * Helper to wrap a Java collection into a Scala Seq
+ *
+ * @param collection java collection
+ * @param <T> collection item type
+ * @return Scala Seq of type T
+ */
+ public static <T> Seq<T> toSeq(Collection<T> collection) {
+ return JavaConverters.asScalaBufferConverter(new ArrayList<>(collection)).asScala().toSeq();
+ }
+
+ public abstract String getDataFormat();
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
index 43ee97b..cb64bc6 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
@@ -16,44 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DataPreprocessingHelperFactory {
- private DataPreprocessingHelperFactory() {
+public class SparkDataPreprocessingHelperFactory {
+ private SparkDataPreprocessingHelperFactory() {
}
- private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class);
- public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+ public static SparkDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
throws IOException {
final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
int numAvroFiles = avroFiles.size();
int numOrcFiles = orcFiles.size();
- Preconditions
- .checkState(numAvroFiles == 0 || numOrcFiles == 0,
- "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
- inputPaths);
+ Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+ "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+ inputPaths);
Preconditions
.checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
inputPaths);
if (numAvroFiles > 0) {
LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
- return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+ return new SparkAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath));
} else {
LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
- return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+ return new SparkOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath));
}
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
new file mode 100644
index 0000000..d4e76a2
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+public class SparkDataPreprocessingJobKey {
+ private final Object _partitionId;
+ private final Object _sortedColumn;
+
+ public SparkDataPreprocessingJobKey(Object partitionId, Object sortedColumn) {
+ _partitionId = partitionId;
+ _sortedColumn = sortedColumn;
+ }
+
+ public Object getPartitionId() {
+ return _partitionId;
+ }
+
+ public Object getSortedColumn() {
+ return _sortedColumn;
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..c7f56ab
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.spark.Partitioner;
+
+
+public class SparkDataPreprocessingPartitioner extends Partitioner {
+ private final String _partitionColumn;
+ private final int _numPartitions;
+ private final PartitionFunction _partitionFunction;
+ private final String _partitionColumnDefaultNullValue;
+ private final AtomicInteger _counter = new AtomicInteger(0);
+
+ public SparkDataPreprocessingPartitioner(String partitionColumn, int numPartitions,
+ PartitionFunction partitionFunction, String partitionColumnDefaultNullValue) {
+ _partitionColumn = partitionColumn;
+ _numPartitions = numPartitions;
+ _partitionFunction = partitionFunction;
+ _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
+ }
+
+ @Override
+ public int numPartitions() {
+ return _numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key;
+ return (int) jobKey.getPartitionId();
+ }
+
+ public int generatePartitionId(Object key) {
+ if (_partitionColumn == null) {
+ // Need to distribute evenly for data with the default partition key value.
+ // We may want to partition and sort on a non-primary key.
+ return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+ }
+ Object keyToPartition = _partitionColumnDefaultNullValue;
+ if (key != null) {
+ keyToPartition = key;
+ }
+ return _partitionFunction.getPartition(keyToPartition);
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..3f84aad
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+
+
+public class SparkOrcDataPreprocessingHelper extends SparkDataPreprocessingHelper {
+ public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) {
+ super(dataPreprocessingHelper);
+ }
+
+ @Override
+ public String getDataFormat() {
+ return "orc";
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
similarity index 96%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
index 0596259..8ee89cf 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.utils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;