blob: 74122a6edb0ab2ae86a163f7e373030d040bb802 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.engine.mr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.IReadableTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by xiefan on 16-11-22.
*
* Read values from multi col files and ensure their order using a K-Way merge algorithm
*
* You need to ensure that values inside each file is sorted
*/
public class SortedColumnDFSFile implements IReadableTable {
private static final Logger logger = LoggerFactory.getLogger(SortedColumnDFSFile.class);
private String dfsPath;
private DFSFileTable dfsFileTable;
private DataType dataType;
public SortedColumnDFSFile(String path, DataType dataType) {
this.dfsPath = path;
this.dfsFileTable = new DFSFileTable(path, -1);
this.dataType = dataType;
}
@Override
public TableReader getReader() throws IOException {
final Comparator<String> comparator = getComparatorByType(dataType);
ArrayList<TableReader> readers = new ArrayList<>();
String filePath = HadoopUtil.fixWindowsPath(dfsPath);
FileSystem fs = HadoopUtil.getFileSystem(filePath);
ArrayList<FileStatus> allFiles = new ArrayList<>();
FileStatus status = fs.getFileStatus(new Path(filePath));
if (status.isFile()) {
allFiles.add(status);
} else {
FileStatus[] listStatus = fs.listStatus(new Path(filePath));
for (FileStatus f : listStatus) {
if (f.isFile())
allFiles.add(f);
}
}
for (FileStatus f : allFiles) {
DFSFileTableReader reader = new DFSFileTableReader(f.getPath().toString(), -1);
readers.add(reader);
}
return new SortedColumnDFSFileReader(readers, comparator);
}
@Override
public TableSignature getSignature() throws IOException {
return dfsFileTable.getSignature();
}
@Override
public boolean exists() throws IOException {
return dfsFileTable.exists();
}
private Comparator<String> getComparatorByType(DataType type) {
return null;
// Comparator<String> comparator;
// if (!type.isNumberFamily()) {
// comparator = new ByteComparator<>(new StringBytesConverter());
// } else if (type.isIntegerFamily()) {
// comparator = new Comparator<String>() {
// @Override
// public int compare(String str1, String str2) {
// try {
// Long num1 = Long.parseLong(str1);
// Long num2 = Long.parseLong(str2);
// return num1.compareTo(num2);
// } catch (NumberFormatException e) {
// logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2);
// e.printStackTrace();
// return 0;
// }
// }
// };
// } else {
// comparator = new Comparator<String>() {
// @Override
// public int compare(String str1, String str2) {
// try {
// Double num1 = Double.parseDouble(str1);
// Double num2 = Double.parseDouble(str2);
// return num1.compareTo(num2);
// } catch (NumberFormatException e) {
// logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2);
// return 0;
// }
// }
// };
// }
// return comparator;
}
@Override
public String toString() {
return dfsPath;
}
}