blob: d47d5b3df276bdacbc17aeeba3ac7af9bb12a36f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.sdk.file;
import junit.framework.TestCase;
import org.apache.avro.generic.GenericData;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.index.IndexStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.Field;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
import org.apache.carbondata.core.scan.expression.conditional.InExpression;
import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression;
import org.apache.carbondata.core.scan.expression.conditional.NotInExpression;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpression;
import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpressionSet;
import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareOrExpression;
public class CarbonReaderTest extends TestCase {
@Test
public void testWriteAndReadFiles() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"}).build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 200);
// Read again
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 200);
reader2.close();
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testWriteAndReadJson() throws IOException, InterruptedException {
int numRows = 100;
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
String json = "{\"name\":\"bob\", \"age\":10}";
Schema schema = new Schema(
new Field[]{
new Field("name", "string"),
new Field("age", "int")});
try {
CarbonWriter writer = CarbonWriter.builder().outputPath(path)
.withJsonInput(schema).writtenBy("AvroCarbonWriterTest").build();
for (int i = 0; i < numRows; i++) {
writer.write(json);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
File[] dataFiles = new File(path).listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertEquals(1, dataFiles.length);
// read it and verify
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"}).build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals("bob", row[0]);
Assert.assertEquals(10, row[1]);
i++;
}
Assert.assertEquals(i, numRows);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test public void testReadWithZeroBatchSize() throws Exception {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance().clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
CarbonReader reader;
reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 10);
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadBatchWithZeroBatchSize() throws Exception {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance().clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
CarbonReader reader;
reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
int i = 0;
while (reader.hasNext()) {
Object[] row = reader.readNextBatchRow();
Assert.assertEquals(row.length, 10);
i++;
}
Assert.assertEquals(i, 1);
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
String path1 = path + "/0testdir";
String path2 = path + "/testdir";
FileUtils.deleteDirectory(new File(path));
FileFactory.getCarbonFile(path1);
FileFactory.mkdirs(path1);
FileFactory.getCarbonFile(path2);
FileFactory.mkdirs(path2);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING);
EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
new LiteralExpression("robot1", DataTypes.STRING));
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
assert ("robot1".equals(row[0]));
i++;
}
Assert.assertEquals(i, 20);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("age", DataTypes.INT);
EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
new LiteralExpression("1", DataTypes.INT));
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
assert (((String) row[0]).contains("robot"));
assert (1 == (int) (row[1]));
i++;
}
Assert.assertEquals(i, 1);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
new LiteralExpression("3.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (((String) row[0]).contains("robot7"));
assert (7 == (int) (row[1]));
assert (3.5 == (double) (row[2]));
i++;
}
Assert.assertEquals(i, 1);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
new LiteralExpression("3.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
OrExpression orExpression = new OrExpression(equalToExpression, equalToExpression2);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(orExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (((String) row[0]).contains("robot7"));
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
i++;
}
Assert.assertEquals(i, 20);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalGreaterThan() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
GreaterThanExpression greaterThanExpression = new GreaterThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(greaterThanExpression, equalToExpression2);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (((String) row[0]).contains("robot7"));
assert (7 == ((int) (row[1]) % 10));
assert ((double) row[2] > 13.5);
i++;
}
Assert.assertEquals(i, 17);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterEqualSet() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
List<Object> values = new ArrayList<>();
values.add("robot7");
values.add("robot1");
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("name", "String", values))
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 40);
reader.close();
List<Object> values2 = new ArrayList<>();
values2.add(1);
values2.add(7);
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("age", "int", values2))
.build();
i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 2);
reader2.close();
List<Object> values3 = new ArrayList<>();
values3.add(0.5);
values3.add(3.5);
CarbonReader reader3 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("doubleField", "double", values3))
.build();
i = 0;
while (reader3.hasNext()) {
Object[] row = (Object[]) reader3.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 2);
reader3.close();
CarbonReader reader4 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpression("name", "string", "robot7"))
.build();
i = 0;
while (reader4.hasNext()) {
Object[] row = (Object[]) reader4.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 20);
reader4.close();
List<Expression> expressions = new ArrayList<>();
expressions.add(prepareEqualToExpression("name", "String", "robot1"));
expressions.add(prepareEqualToExpression("name", "String", "robot7"));
expressions.add(prepareEqualToExpression("age", "int", "2"));
CarbonReader reader5 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareOrExpression(expressions))
.build();
i = 0;
while (reader5.hasNext()) {
Object[] row = (Object[]) reader5.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot2")) {
assert (2 == ((int) (row[1]) % 10));
assert (0 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 41);
reader5.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalLessThan() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(lessThanExpression, equalToExpression2);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (((String) row[0]).contains("robot7"));
assert (7 == ((int) (row[1]) % 10));
assert ((double) row[2] < 13.5);
i++;
}
Assert.assertEquals(i, 2);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalNotEqual() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
NotEqualsExpression notEqualsExpression = new NotEqualsExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(lessThanExpression, notEqualsExpression);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (!((String) row[0]).contains("robot7"));
assert (7 != ((int) (row[1]) % 10));
assert ((double) row[2] < 13.5);
i++;
}
Assert.assertEquals(i, 25);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalIn() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
InExpression inExpression = new InExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(lessThanExpression, inExpression);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (((String) row[0]).contains("robot7"));
assert (7 == ((int) (row[1]) % 10));
assert ((double) row[2] < 13.5);
i++;
}
Assert.assertEquals(i, 2);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterOfNonTransactionalNotIn() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
NotInExpression notInExpression = new NotInExpression(columnExpression2,
new LiteralExpression("robot7", DataTypes.STRING));
AndExpression andExpression = new AndExpression(lessThanExpression, notInExpression);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (!((String) row[0]).contains("robot7"));
assert (7 != ((int) (row[1]) % 10));
assert ((double) row[2] < 13.5);
i++;
}
Assert.assertEquals(i, 25);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testWriteAndReadFilesWithReaderBuildFail() throws IOException, InterruptedException {
String path1 = "./testWriteFiles";
String path2 = "./testWriteFiles2";
FileUtils.deleteDirectory(new File(path1));
FileUtils.deleteDirectory(new File(path2));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path1), false);
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path2), false);
Field[] fields = new Field[] { new Field("c1", "string"),
new Field("c2", "int") };
Schema schema = new Schema(fields);
CarbonWriterBuilder builder = CarbonWriter.builder();
CarbonWriter carbonWriter = null;
try {
carbonWriter = builder.outputPath(path1).uniqueIdentifier(12345)
.withCsvInput(schema).writtenBy("CarbonReaderTest").build();
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
carbonWriter.write(new String[]{"MNO", "100"});
carbonWriter.close();
Field[] fields1 = new Field[]{new Field("p1", "string"),
new Field("p2", "int")};
Schema schema1 = new Schema(fields1);
CarbonWriterBuilder builder1 = CarbonWriter.builder();
CarbonWriter carbonWriter1 = null;
try {
carbonWriter1 = builder1.outputPath(path2).uniqueIdentifier(12345)
.withCsvInput(schema1).writtenBy("CarbonReaderTest").build();
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
carbonWriter1.write(new String[]{"PQR", "200"});
carbonWriter1.close();
try {
CarbonReader reader =
CarbonReader.builder(path1, "_temp")
.projection(new String[]{"c1", "c3"})
.build();
Assert.fail();
} catch (Exception e) {
System.out.println("Success");
Assert.assertTrue(true);
}
CarbonReader reader1 =
CarbonReader.builder(path2, "_temp1")
.projection(new String[]{"p1", "p2"})
.build();
while (reader1.hasNext()) {
Object[] row1 = (Object[]) reader1.readNextRow();
System.out.println(row1[0]);
System.out.println(row1[1]);
}
reader1.close();
FileUtils.deleteDirectory(new File(path1));
FileUtils.deleteDirectory(new File(path2));
}
@Test
public void testReadColumnTwice() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "age", "name"})
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
Assert.assertEquals(i, row[2]);
Assert.assertEquals("robot" + (i % 10), row[3]);
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
// Below test case was working with transactional table as schema file was present.
// now we don't support transactional table from SDK. only flat folder is supported.
// and currently flat folder will never check for schema files.
@Ignore
public void readFilesParallel() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
// Reader 2
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Object[] row2 = (Object[]) reader2.readNextRow();
// parallel compare
Assert.assertEquals(row[0], row2[0]);
Assert.assertEquals(row[1], row2[1]);
}
reader.close();
reader2.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadAfterClose() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"}).build();
reader.close();
String msg = "CarbonReader not initialise, please create it first.";
try {
reader.hasNext();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}
try {
reader.readNextRow();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}
try {
reader.close();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testWriteAndReadFilesWithoutTableName() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader
.builder(path)
.projection(new String[]{"name", "age"})
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testWriteAndReadFilesWithoutTableName2() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
IndexStoreManager.getInstance()
.clearIndexCache(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path).build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadSchemaFromDataFile() throws IOException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("carbondata");
}
});
Assert.assertTrue(dataFiles != null);
Assert.assertTrue(dataFiles.length > 0);
Schema schema = CarbonSchemaReader.readSchema(dataFiles[0].getAbsolutePath());
Assert.assertTrue(schema.getFields().length == 2);
Assert.assertEquals("name", (schema.getFields())[0].getFieldName());
Assert.assertEquals("age", (schema.getFields())[1].getFieldName());
Assert.assertEquals(DataTypes.STRING, (schema.getFields())[0].getDataType());
Assert.assertEquals(DataTypes.INT, (schema.getFields())[1].getDataType());
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testWriteAndReadFilesNonTransactional() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
// Write to a Non Transactional Table
TestUtil.writeFilesAndVerify(new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
CarbonProperties carbonProperties;
@Override
public void setUp() {
carbonProperties = CarbonProperties.getInstance();
}
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonReaderTest.class.getName());
@Test
public void testTimeStampAndBadRecord() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("intField", DataTypes.INT);
fields[2] = new Field("shortField", DataTypes.SHORT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
try {
CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
for (int i = 0; i < 100; i++) {
String[] row = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2018-05-12",
"2018-05-12",
"12.345"
};
writer.write(row);
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
File folder = new File(path);
Assert.assertTrue(folder.exists());
File[] dataFiles = folder.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertTrue(dataFiles.length > 0);
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{
"stringField"
, "shortField"
, "intField"
, "longField"
, "doubleField"
, "boolField"
, "dateField"
, "timeField"
, "decimalField"})
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
long day = 24L * 3600 * 1000;
Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}
@Test
public void testReadSchemaInDataFileAndSort() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
try {
CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
for (int i = 0; i < 100; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("carbondata");
}
});
Schema schema = CarbonSchemaReader.readSchema(dataFiles2[0].getAbsolutePath());
// sort the schema
Arrays.sort(schema.getFields(), new Comparator<Field>() {
@Override
public int compare(Field o1, Field o2) {
return Integer.compare(o1.getSchemaOrdinal(), o2.getSchemaOrdinal());
}
});
// Transform the schema
String[] strings = new String[schema.getFields().length];
for (int i = 0; i < schema.getFields().length; i++) {
strings[i] = (schema.getFields())[i].getFieldName();
}
File folder = new File(path);
Assert.assertTrue(folder.exists());
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
long day = 24L * 3600 * 1000;
Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}
@Test
public void testReadUserSchema() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
try {
CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path)
.writtenBy("SDK_1.0.0");
CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
for (int i = 0; i < 100; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
File[] dataFiles1 = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("carbondata");
}
});
String versionDetails = CarbonSchemaReader.getVersionDetails(dataFiles1[0].getAbsolutePath());
assertTrue(versionDetails.contains("SDK_1.0.0 in version: "));
File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("carbonindex");
}
});
Schema schema = CarbonSchemaReader.readSchema(dataFiles2[0].getAbsolutePath()).asOriginOrder();
// Transform the schema
String[] strings = new String[schema.getFields().length];
for (int i = 0; i < schema.getFields().length; i++) {
strings[i] = (schema.getFields())[i].getFieldName();
}
File folder = new File(path);
Assert.assertTrue(folder.exists());
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
long day = 24L * 3600 * 1000;
Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}
@Test
public void testReadFilesWithProjectAllColumns() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path, "_temp").build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadFilesWithDefaultProjection() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
CarbonReader reader = CarbonReader.builder(path, "_temp").build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
reader.close();
Assert.assertEquals(i, 100);
}
@Test
public void testReadFilesWithNullProjection() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
try {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{})
.build();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equalsIgnoreCase("Projection can't be empty"));
}
}
private void WriteAvroComplexData(String mySchema, String json, String path)
throws IOException, InvalidLoadOptionException {
// conversion to GenericData.Record
org.apache.avro.Schema nn = new org.apache.avro.Schema.Parser().parse(mySchema);
GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
try {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withAvroInput(nn).writtenBy("CarbonReaderTest").build();
for (int i = 0; i < 100; i++) {
writer.write(record);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
// TODO: support get schema of complex data type
@Ignore
public void testReadUserSchemaOfComplex() throws IOException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
String mySchema =
"{" +
" \"name\": \"address\", " +
" \"type\": \"record\", " +
" \"fields\": [ " +
" { \"name\": \"name\", \"type\": \"string\"}, " +
" { \"name\": \"age\", \"type\": \"int\"}, " +
" { " +
" \"name\": \"address\", " +
" \"type\": { " +
" \"type\" : \"record\", " +
" \"name\" : \"my_address\", " +
" \"fields\" : [ " +
" {\"name\": \"street\", \"type\": \"string\"}, " +
" {\"name\": \"city\", \"type\": \"string\"} " +
" ]} " +
" }, " +
" {\"name\" :\"doorNum\", " +
" \"type\" : { " +
" \"type\" :\"array\", " +
" \"items\":{ " +
" \"name\" :\"EachdoorNums\", " +
" \"type\" : \"int\", " +
" \"default\":-1} " +
" } " +
" }] " +
"}";
String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, "
+ " \"doorNum\" : [1,2,3,4]}";
try {
WriteAvroComplexData(mySchema, json, path);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
File folder = new File(path);
Assert.assertTrue(folder.exists());
File[] dataFiles = folder.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertEquals(1, dataFiles.length);
File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("carbonindex");
}
});
Schema schema = CarbonSchemaReader.readSchema(dataFiles2[0].getAbsolutePath()).asOriginOrder();
for (int i = 0; i < schema.getFields().length; i++) {
System.out.println((schema.getFields())[i].getFieldName() + "\t" + schema.getFields()[i].getSchemaOrdinal());
}
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadMapType() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
String mySchema =
"{ " +
" \"name\": \"address\", " +
" \"type\": \"record\", " +
" \"fields\": [ " +
" { " +
" \"name\": \"name\", " +
" \"type\": \"string\" " +
" }, " +
" { " +
" \"name\": \"age\", " +
" \"type\": \"int\" " +
" }, " +
" { " +
" \"name\": \"mapRecord\", " +
" \"type\": { " +
" \"type\": \"map\", " +
" \"values\": \"string\" " +
" } " +
" } " +
" ] " +
"} ";
String json =
"{\"name\":\"bob\", \"age\":10, \"mapRecord\": {\"street\": \"k-lane\", \"city\": \"bangalore\"}}";
try {
WriteAvroComplexData(mySchema, json, path);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("mapRecord", DataTypes.createMapType(DataTypes.STRING, DataTypes.STRING));
CarbonReader reader = CarbonReader.builder(path, "_temp").build();
// expected output
String name = "bob";
int age = 10;
Object[] mapKeValue = new Object[2];
mapKeValue[0] = new Object[]{"city", "street"};
mapKeValue[1] = new Object[]{"bangalore", "k-lane"};
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(name, row[0]);
Assert.assertArrayEquals(mapKeValue, (Object[]) row[1]);
Assert.assertEquals(age, row[2]);
i++;
}
reader.close();
Assert.assertEquals(i, 100);
}
@Test
public void testReadWithFilterOfnonTransactionalwithsubfolders() throws IOException, InterruptedException {
String path1 = "./testWriteFiles/1/" + System.nanoTime();
String path2 = "./testWriteFiles/2/" + System.nanoTime();
String path3 = "./testWriteFiles/3/" + System.nanoTime();
FileUtils.deleteDirectory(new File("./testWriteFiles"));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path1);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path2);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path3);
EqualToExpression equalToExpression = new EqualToExpression(
new ColumnExpression("name", DataTypes.STRING),
new LiteralExpression("robot1", DataTypes.STRING));
CarbonReader reader = CarbonReader
.builder("./testWriteFiles", "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
assert ("robot1".equals(row[0]));
i++;
}
Assert.assertEquals(i, 60);
reader.close();
FileUtils.deleteDirectory(new File("./testWriteFiles"));
}
@Test
public void testReadSchemaFromDataFileArrayString() {
String path = "./testWriteFiles";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[11];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
"Hello#World#From#Carbon"
};
writer.write(row2);
}
writer.close();
File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name == null) {
return false;
}
return name.endsWith("carbondata");
}
});
if (dataFiles == null || dataFiles.length < 1) {
throw new RuntimeException("Carbon data file not exists.");
}
Schema schema = CarbonSchemaReader
.readSchema(dataFiles[0].getAbsolutePath())
.asOriginOrder();
// Transform the schema
String[] strings = new String[schema.getFields().length];
for (int i = 0; i < schema.getFields().length; i++) {
strings[i] = (schema.getFields())[i].getFieldName();
}
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert (row[0].equals("robot" + i));
assert (row[2].equals(i));
assert (row[6].equals(17957));
Object[] arr = (Object[]) row[10];
assert (arr[0].equals("Hello"));
assert (arr[3].equals("Carbon"));
i++;
}
reader.close();
FileUtils.deleteDirectory(new File(path));
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
@Test
public void testReadNextRowWithRowUtil() {
String path = "./carbondata";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[12];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[11] = new Field("floatField", DataTypes.FLOAT);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
"Hello#World#From#Carbon",
"1.23"
};
writer.write(row2);
}
writer.close();
File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name == null) {
return false;
}
return name.endsWith("carbonindex");
}
});
if (dataFiles == null || dataFiles.length < 1) {
throw new RuntimeException("Carbon index file not exists.");
}
Schema schema = CarbonSchemaReader
.readSchema(dataFiles[0].getAbsolutePath())
.asOriginOrder();
// Transform the schema
int count = 0;
for (int i = 0; i < schema.getFields().length; i++) {
if (!((schema.getFields())[i].getFieldName().contains("."))) {
count++;
}
}
String[] strings = new String[count];
int index = 0;
for (int i = 0; i < schema.getFields().length; i++) {
if (!((schema.getFields())[i].getFieldName().contains("."))) {
strings[index] = (schema.getFields())[i].getFieldName();
index++;
}
}
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] data = (Object[]) reader.readNextRow();
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getShort(data, 1), i);
assertEquals(RowUtil.getInt(data, 2), i);
assertEquals(RowUtil.getLong(data, 3), Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 4), ((double) i) / 2);
assert (RowUtil.getBoolean(data, 5));
assertEquals(RowUtil.getInt(data, 6), 17957);
assert (RowUtil.getDecimal(data, 8).equals("12.35"));
assert (RowUtil.getVarchar(data, 9).equals("varchar"));
Object[] arr = RowUtil.getArray(data, 10);
assert (arr[0].equals("Hello"));
assert (arr[1].equals("World"));
assert (arr[2].equals("From"));
assert (arr[3].equals("Carbon"));
assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
i++;
}
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}
@Test
public void testReadNextRowWithProjectionAndRowUtil() {
String path = "./carbondata";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[12];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[11] = new Field("floatField", DataTypes.FLOAT);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
"Hello#World#From#Carbon",
"1.23"
};
writer.write(row2);
}
writer.close();
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.withRowRecordReader()
.build();
int i = 0;
while (reader.hasNext()) {
Object[] data = (Object[]) reader.readNextRow();
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getInt(data, 1), 17957);
assert (RowUtil.getVarchar(data, 3).equals("varchar"));
Object[] arr = RowUtil.getArray(data, 4);
assert (arr[0].equals("Hello"));
assert (arr[1].equals("World"));
assert (arr[2].equals("From"));
assert (arr[3].equals("Carbon"));
assertEquals(RowUtil.getShort(data, 5), i);
assertEquals(RowUtil.getInt(data, 6), i);
assertEquals(RowUtil.getLong(data, 7), Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 8), ((double) i) / 2);
assert (RowUtil.getBoolean(data, 9));
assert (RowUtil.getDecimal(data, 10).equals("12.35"));
assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
i++;
}
assert (i == 10);
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}
@Test
public void testVectorReader() {
String path = "./testWriteFiles";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[12];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("byteField", DataTypes.BYTE);
fields[11] = new Field("floatField", DataTypes.FLOAT);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
String.valueOf(i),
"1.23"
};
writer.write(row2);
}
writer.close();
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.build();
int i = 0;
while (reader.hasNext()) {
Object[] data = (Object[]) reader.readNextRow();
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getShort(data, 4), i);
assertEquals(RowUtil.getInt(data, 5), i);
assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
assert (RowUtil.getBoolean(data, 8));
assertEquals(RowUtil.getInt(data, 1), 17957);
assert (RowUtil.getDecimal(data, 9).equals("12.35"));
assert (RowUtil.getString(data, 3).equals("varchar"));
assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i)));
assertEquals(RowUtil.getFloat(data, 11), new Float("1.23"));
i++;
}
assert (i == 10);
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}
@Test
public void testReadNextBatchRow() {
String path = "./carbondata";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[12];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[11] = new Field("floatField", DataTypes.FLOAT);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 300; i++) {
String[] row2 = new String[]{
"robot" + (i % 10000),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
"Hello#World#From#Carbon",
"1.23"
};
writer.write(row2);
}
writer.close();
// Read data
int batchSize = 150;
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.withBatch(batchSize)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] batch = reader.readNextBatchRow();
Assert.assertTrue(batch.length <= batchSize);
for (int j = 0; j < batch.length; j++) {
Object[] data = (Object[]) batch[j];
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getInt(data, 1), 17957);
assert (RowUtil.getVarchar(data, 3).equals("varchar"));
Object[] arr = RowUtil.getArray(data, 4);
assert (arr[0].equals("Hello"));
assert (arr[1].equals("World"));
assert (arr[2].equals("From"));
assert (arr[3].equals("Carbon"));
assertEquals(RowUtil.getShort(data, 5), i);
assertEquals(RowUtil.getInt(data, 6), i);
assertEquals(RowUtil.getLong(data, 7), Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 8), ((double) i) / 2);
assert (RowUtil.getBoolean(data, 9));
assert (RowUtil.getDecimal(data, 10).equals("12.35"));
assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
i++;
}
System.out.println("batch is " + i);
}
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Test
public void testReadNextBatchRowWithVectorReader() {
String path = "./carbondata";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[11];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
// Vector don't support complex data type
// fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[10] = new Field("floatField", DataTypes.FLOAT);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 300; i++) {
String[] row2 = new String[]{
"robot" + (i % 10000),
String.valueOf(i % 10000),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345",
"varchar",
"1.23"
};
writer.write(row2);
}
writer.close();
// Read data
int batchSize = 150;
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.withBatch(batchSize)
.build();
int i = 0;
while (reader.hasNext()) {
Object[] batch = reader.readNextBatchRow();
Assert.assertTrue(batch.length <= batchSize);
for (int j = 0; j < batch.length; j++) {
Object[] data = (Object[]) batch[j];
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getInt(data, 1), 17957);
assert (RowUtil.getVarchar(data, 3).equals("varchar"));
assertEquals(RowUtil.getShort(data, 4), i);
assertEquals(RowUtil.getInt(data, 5), i);
assertEquals(RowUtil.getLong(data, 6), Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
assert (RowUtil.getDecimal(data, 9).equals("12.35"));
assertEquals(RowUtil.getFloat(data, 10), (float) 1.23);
i++;
}
System.out.println("batch is " + i);
}
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Test
public void testReadingNullValues() {
String path = "./testWriteFiles";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("booleanField", DataTypes.BOOLEAN);
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
for (int i = 0; i < 2; i++) {
String[] row2 = new String[]{
"robot" + (i % 10),
"",
};
writer.write(row2);
}
writer.close();
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.build();
int i = 0;
while (reader.hasNext()) {
reader.readNextRow();
i++;
}
assert (i == 2);
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}
@Test
public void testSdkWriteWhenArrayOfStringIsEmpty()
throws IOException, InvalidLoadOptionException {
String badRecordAction =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
String path = "./testSdkWriteWhenArrayOfStringIsEmpty";
String[] rec = {"aaa", "bbb", "aaa@cdf.com", "", "", "mmm", ""};
Field[] fields = new Field[7];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
fields[2] = new Field("stringField1", DataTypes.STRING);
fields[3] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[4] = new Field("arrayField1", DataTypes.createArrayType(DataTypes.STRING));
fields[5] = new Field("arrayField2", DataTypes.createArrayType(DataTypes.STRING));
fields[6] = new Field("varcharField1", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("complex_delimiter_level_1", "#");
map.put("bad_records_logger_enable", "TRUE");
map.put("bad_record_path", path + "/badrec");
CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
builder.withLoadOptions(map).withCsvInput(schema).enableLocalDictionary(false)
.writtenBy("CarbonReaderTest");
CarbonWriter writer = builder.build();
writer.write(rec);
writer.close();
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, badRecordAction);
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testValidateBadRecordsActionWithImproperValue() throws IOException {
String path = "./testValidateBadRecordsActionValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("BAD_RECORDS_ACTION", "FAL");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("option BAD_RECORDS_ACTION can have only either " +
"FORCE or IGNORE or REDIRECT or FAIL. It shouldn't be FAL"));
} catch (Exception e) {
Assert.fail();
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateBadRecordsActionWithProperValue() throws IOException {
String path = "./testValidateBadRecordsActionValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("BAD_RECORDS_ACTION", "FAIL");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
} catch (IllegalArgumentException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateBadRecordsLoggerEnableWithImproperValue() throws IOException {
String path = "./testValidateBadRecordsLoggerEnableValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("bad_records_logger_enable", "FLSE");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"Invalid value FLSE for key bad_records_logger_enable"));
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateBadRecordsLoggerEnableWithProperValue() throws IOException {
String path = "./testValidateBadRecordsLoggerEnableValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("bad_records_logger_enable", "FALSE");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
} catch (IllegalArgumentException e) {
e.printStackTrace();
Assert.fail();
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateQuoteCharWithImproperValue() throws IOException {
String path = "./testValidateQuoteCharWithImproperValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("quotechar", "##");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"QUOTECHAR cannot be more than one character."));
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateQuoteCharWithProperValue() throws IOException {
String path = "./testValidateQuoteCharWithProperValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("quotechar", "#");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
} catch (IllegalArgumentException e) {
e.printStackTrace();
Assert.fail();
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateEscapeCharWithImproperValue() throws IOException {
String path = "./testValidateEscapeCharWithImproperValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("escapechar", "##");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"ESCAPECHAR cannot be more than one character."));
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testValidateEscapeCharWithProperValue() throws IOException {
String path = "./testValidateEscapeCharWithProperValue";
Field[] fields = new Field[2];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("varcharField", DataTypes.VARCHAR);
Schema schema = new Schema(fields);
Map map = new HashMap();
map.put("escapechar", "#");
try {
CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(schema)
.enableLocalDictionary(false)
.writtenBy("CarbonReaderTest")
.build();
} catch (IllegalArgumentException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
FileUtils.deleteDirectory(new File(path));
}
}
@Test
public void testWriteWithDifferentDataType() {
String path = "./carbondata";
try {
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[13];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("shortField", DataTypes.SHORT);
fields[2] = new Field("intField", DataTypes.INT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
fields[11] = new Field("floatField", DataTypes.FLOAT);
fields[12] = new Field("binaryField", DataTypes.BINARY);
Map<String, String> map = new HashMap<>();
map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.withLoadOptions(map)
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderTest")
.build();
byte[] value = "Binary".getBytes();
for (int i = 0; i < 10; i++) {
Object[] row2 = new Object[]{
"robot" + (i % 10),
i % 10000,
i,
(Long.MAX_VALUE - i),
((double) i / 2),
(true),
"2019-03-02",
"2019-02-12 03:03:34",
12.345,
"varchar",
"Hello#World#From#Carbon",
1.23,
value
};
writer.write(row2);
}
writer.close();
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.withRowRecordReader()
.build();
int i = 0;
while (reader.hasNext()) {
Object[] data = (Object[]) reader.readNextRow();
assert (RowUtil.getString(data, 0).equals("robot" + i));
assertEquals(RowUtil.getInt(data, 1), 17957);
Assert.assertEquals(new String(value), new String(RowUtil.getBinary(data, 3)));
assert (RowUtil.getVarchar(data, 4).equals("varchar"));
Object[] arr = RowUtil.getArray(data, 5);
assert (arr[0].equals("Hello"));
assert (arr[1].equals("World"));
assert (arr[2].equals("From"));
assert (arr[3].equals("Carbon"));
assertEquals(RowUtil.getShort(data, 6), i);
assertEquals(RowUtil.getInt(data, 7), i);
assertEquals(RowUtil.getLong(data, 8), Long.MAX_VALUE - i);
assertEquals(RowUtil.getDouble(data, 9), ((double) i) / 2);
assert (RowUtil.getBoolean(data, 10));
assert (RowUtil.getDecimal(data, 11).equals("12.35"));
assertEquals(RowUtil.getFloat(data, 12), (float) 1.23);
i++;
}
assert (i == 10);
reader.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}
@Test
public void testReadBlocklet() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);
InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
// check for 3 blocklet count (as only one carbon file will be created)
Assert.assertEquals(splits.length, 3);
int totalCount = 0;
for (int k = 0; k < splits.length; k++) {
CarbonReader reader = CarbonReader
.builder(splits[k])
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
i++;
}
totalCount += i;
reader.close();
}
Assert.assertEquals(totalCount, 1000000);
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testGetSplits() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);
InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
// check for 3 blocklet count (as only one carbon file will be created)
Assert.assertEquals(splits.length, 3);
InputSplit[] splits1 = CarbonReader.builder(path).getSplits(false);
// check for 1 block count (as only one carbon file will be created)
Assert.assertEquals(splits1.length, 1);
FileUtils.deleteDirectory(new File(path));
}
@Test
public void testReadWithFilterNonResult() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
ColumnExpression columnExpression = new ColumnExpression("age", DataTypes.INT);
EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
new LiteralExpression("-11", DataTypes.INT));
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.build();
int i = 0;
while (reader.hasNext()) {
Assert.assertTrue(false);
i++;
}
Assert.assertEquals(i, 0);
reader.close();
FileUtils.deleteDirectory(new File(path));
}
}