blob: aea6b0ba8992504529a49d999d07f59ab8ef1253 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.engine.mr;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Random;
import java.util.UUID;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.dict.ByteComparator;
import org.apache.kylin.dict.BytesConverter;
import org.apache.kylin.dict.IDictionaryValueEnumerator;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TableColumnValueEnumerator;
import org.apache.kylin.metadata.datatype.DataType;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Created by xiefan on 16-11-14.
*/
public class SortedColumnReaderTest extends LocalFileMetadataTestCase {
@Before
public void setup() throws Exception {
this.createTestMetadata();
}
@After
public void after() throws Exception {
this.cleanupTestMetadata();
}
@Test
public void testReadStringMultiFile() throws Exception {
String dirPath = "src/test/resources/multi_file_str";
ArrayList<String> correctAnswer = readAllFiles(dirPath);
Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
output.add(new String(e.current()));
}
System.out.println(correctAnswer.size());
assertTrue(correctAnswer.size() == output.size());
for (int i = 0; i < correctAnswer.size(); i++) {
assertEquals(correctAnswer.get(i), output.get(i));
}
}
@Ignore
@Test
public void createStringTestFiles() throws Exception {
String dirPath = "src/test/resources/multi_file_str";
String prefix = "src/test/resources/multi_file_str/data_";
ArrayList<String> data = new ArrayList<>();
int num = 10000;
for (int i = 0; i < num; i++) {
UUID uuid = RandomUtil.randomUUID();
data.add(uuid.toString());
}
Collections.sort(data, new ByteComparator<String>(new StringBytesConverter()));
Random rand = new Random(System.currentTimeMillis());
ArrayList<File> allFiles = new ArrayList<>();
int fileNum = 5;
for (int i = 0; i < fileNum; i++) {
File f = new File(prefix + i);
if (!f.exists())
f.createNewFile();
allFiles.add(f);
}
ArrayList<BufferedWriter> bws = new ArrayList<>();
for (File f : allFiles) {
bws.add(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), StandardCharsets.UTF_8)));
}
System.out.println(data.size());
for (String str : data) {
int fileId = rand.nextInt(fileNum);
BufferedWriter bw = bws.get(fileId);
bw.write(str);
bw.newLine();
}
for (BufferedWriter bw : bws) {
bw.flush();
bw.close();
}
File dir = new File(dirPath);
File[] files = dir.listFiles();
for (File file : files) {
System.out.println("file:" + file.getAbsolutePath() + " size:" + file.length());
}
}
@Test
public void testReadIntegerMultiFiles() throws Exception {
String dirPath = "src/test/resources/multi_file_int";
ArrayList<String> correctAnswer = readAllFiles(dirPath);
Collections.sort(correctAnswer, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
try {
Long l1 = Long.parseLong(o1);
Long l2 = Long.parseLong(o2);
return l1.compareTo(l2);
} catch (NumberFormatException e) {
e.printStackTrace();
return 0;
}
}
});
SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("long"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
output.add(new String(e.current()));
}
System.out.println(correctAnswer.size());
assertTrue(correctAnswer.size() == output.size());
for (int i = 0; i < correctAnswer.size(); i++) {
assertEquals(correctAnswer.get(i), output.get(i));
}
}
@Test
public void testEmptyDir() throws Exception {
String dirPath = "src/test/resources/empty_dir";
new File(dirPath).mkdirs();
SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
System.out.println(new String(e.current()));
output.add(new String(e.current()));
}
System.out.println(output.size());
}
@Test
public void testEmptyFile() throws Exception {
String dirPath = "src/test/resources/multi_file_empty_file";
ArrayList<String> correctAnswer = readAllFiles(dirPath);
final BytesConverter<String> converter = new StringBytesConverter();
Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
System.out.println("correct answer:" + correctAnswer);
SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
output.add(new String(e.current()));
}
System.out.println(correctAnswer.size());
assertTrue(correctAnswer.size() == output.size());
for (int i = 0; i < correctAnswer.size(); i++) {
assertEquals(correctAnswer.get(i), output.get(i));
}
}
@Ignore
@Test
public void createIntegerTestFiles() throws Exception {
String dirPath = "src/test/resources/multi_file_int";
String prefix = "src/test/resources/multi_file_int/data_";
Random rand = new Random(System.currentTimeMillis());
ArrayList<String> data = new ArrayList<>();
int num = 10000;
for (int i = 0; i < num; i++) {
data.add(i + "");
}
ArrayList<File> allFiles = new ArrayList<>();
int fileNum = 5;
for (int i = 0; i < fileNum; i++) {
File f = new File(prefix + i);
if (!f.exists())
f.createNewFile();
allFiles.add(f);
}
ArrayList<BufferedWriter> bws = new ArrayList<>();
for (File f : allFiles) {
bws.add(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), StandardCharsets.UTF_8)));
}
System.out.println(data.size());
for (String str : data) {
int fileId = rand.nextInt(fileNum);
BufferedWriter bw = bws.get(fileId);
bw.write(str);
bw.newLine();
}
for (BufferedWriter bw : bws) {
bw.flush();
bw.close();
}
File dir = new File(dirPath);
File[] files = dir.listFiles();
for (File file : files) {
System.out.println("file:" + file.getAbsolutePath() + " size:" + file.length());
}
}
@Test
public void testReadDoubleMultiFiles() throws Exception {
String dirPath = "src/test/resources/multi_file_double";
ArrayList<String> correctAnswer = readAllFiles(dirPath);
Collections.sort(correctAnswer, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
try {
Double d1 = Double.parseDouble(o1);
Double d2 = Double.parseDouble(o2);
return d1.compareTo(d2);
} catch (NumberFormatException e) {
e.printStackTrace();
return 0;
}
}
});
SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("double"));
IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
ArrayList<String> output = new ArrayList<>();
while (e.moveNext()) {
output.add(new String(e.current()));
}
System.out.println(correctAnswer.size());
assertTrue(correctAnswer.size() == output.size());
for (int i = 0; i < correctAnswer.size(); i++) {
assertEquals(correctAnswer.get(i), output.get(i));
}
}
@Ignore
@Test
public void createDoubleTestFiles() throws Exception {
String dirPath = "src/test/resources/multi_file_double";
String prefix = "src/test/resources/multi_file_double/data_";
Random rand = new Random(System.currentTimeMillis());
ArrayList<String> data = new ArrayList<>();
int num = 10000;
double k = 0.0;
for (int i = 0; i < num; i++) {
data.add(k + "");
k += 0.52;
}
ArrayList<File> allFiles = new ArrayList<>();
int fileNum = 5;
for (int i = 0; i < fileNum; i++) {
File f = new File(prefix + i);
if (!f.exists())
f.createNewFile();
allFiles.add(f);
}
ArrayList<BufferedWriter> bws = new ArrayList<>();
for (File f : allFiles) {
bws.add(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), StandardCharsets.UTF_8)));
}
System.out.println(data.size());
for (String str : data) {
int fileId = rand.nextInt(fileNum);
BufferedWriter bw = bws.get(fileId);
bw.write(str);
bw.newLine();
}
for (BufferedWriter bw : bws) {
bw.flush();
bw.close();
}
File dir = new File(dirPath);
File[] files = dir.listFiles();
for (File file : files) {
System.out.println("file:" + file.getAbsolutePath() + " size:" + file.length());
}
}
private ArrayList<String> readAllFiles(String dirPath) throws Exception {
ArrayList<String> result = new ArrayList<>();
File dir = new File(dirPath);
for (File f : dir.listFiles()) {
BufferedReader br = new BufferedReader(
new InputStreamReader(new FileInputStream(f), StandardCharsets.UTF_8));
String str = br.readLine();
while (str != null) {
result.add(str);
str = br.readLine();
}
}
return result;
}
private String qualify(String path) {
String absolutePath = new File(path).getAbsolutePath();
if (absolutePath.startsWith("/"))
return "file://" + absolutePath;
else
return "file:///" + absolutePath;
}
}