| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.sdk.file; |
| |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| import org.apache.carbondata.core.metadata.datatype.Field; |
| import org.apache.carbondata.core.metadata.schema.SchemaReader; |
| import org.apache.carbondata.core.metadata.schema.table.TableInfo; |
| import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; |
| |
| import org.apache.avro.generic.GenericData; |
| import org.apache.commons.io.FileUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import org.apache.avro.Schema; |
| |
| public class AvroCarbonWriterTest { |
| private String path = "./AvroCarbonWriterSuiteWriteFiles"; |
| |
| @Test |
| public void testWriteBasic() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| // Avro schema |
| String avroSchema = |
| "{" + |
| " \"type\" : \"record\"," + |
| " \"name\" : \"Acme\"," + |
| " \"fields\" : [" |
| + "{ \"name\" : \"name\", \"type\" : \"string\" }," |
| + "{ \"name\" : \"age\", \"type\" : \"int\" }]" + |
| "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10}"; |
| |
| // conversion to GenericData.Record |
| GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); |
| try { |
| CarbonWriter writer = CarbonWriter.builder().outputPath(path) |
| .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build(); |
| |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| |
| File[] dataFiles = new File(path).listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); |
| } |
| }); |
| Assert.assertNotNull(dataFiles); |
| Assert.assertEquals(1, dataFiles.length); |
| |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| @Test |
| public void testWriteAllPrimitive() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| // Avro schema |
| // Supported Primitive Datatype. |
| // 1. Boolean |
| // 2. Int |
| // 3. long |
| // 4. float -> To carbon Internally it is double. |
| // 5. double |
| // 6. String |
| |
| // Not Supported |
| // 1.NULL Datatype |
| // 2.Bytes |
| |
| String avroSchema = "{\n" + " \"name\" : \"myrecord\",\n" |
| + " \"namespace\": \"org.apache.parquet.avro\",\n" + " \"type\" : \"record\",\n" |
| + " \"fields\" : [ " |
| + " {\n" + " \"name\" : \"myboolean\",\n" + " \"type\" : \"boolean\"\n }," |
| + " {\n" + " \"name\" : \"myint\",\n" + " \"type\" : \"int\"\n" + " }, " |
| + " {\n \"name\" : \"mylong\",\n" + " \"type\" : \"long\"\n" + " }," |
| + " {\n \"name\" : \"myfloat\",\n" + " \"type\" : \"float\"\n" + " }, " |
| + " {\n \"name\" : \"mydouble\",\n" + " \"type\" : \"double\"\n" + " }," |
| + " {\n \"name\" : \"mystring\",\n" + " \"type\" : \"string\"\n" + " }\n" + "] }"; |
| |
| String json = "{" |
| + "\"myboolean\":true, " |
| + "\"myint\": 10, " |
| + "\"mylong\": 7775656565," |
| + " \"myfloat\": 0.2, " |
| + "\"mydouble\": 44.56, " |
| + "\"mystring\":\"Ajantha\"}"; |
| |
| |
| // conversion to GenericData.Record |
| GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); |
| |
| try { |
| CarbonWriter writer = CarbonWriter.builder() |
| .outputPath(path) |
| |
| .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build(); |
| |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| |
| File[] dataFiles = new File(path).listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); |
| } |
| }); |
| |
| Assert.assertNotNull(dataFiles); |
| Assert.assertEquals(1, dataFiles.length); |
| |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| |
| @Test |
| public void testWriteNestedRecord() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| String newAvroSchema = |
| "{" + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"userInfo\", " + |
| " \"namespace\" : \"my.example\", " + |
| " \"fields\" : [{\"name\" : \"username\", " + |
| " \"type\" : \"string\", " + |
| " \"default\" : \"NONE\"}, " + |
| |
| " {\"name\" : \"age\", " + |
| " \"type\" : \"int\", " + |
| " \"default\" : -1}, " + |
| |
| "{\"name\" : \"address\", " + |
| " \"type\" : { " + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"mailing_address\", " + |
| " \"fields\" : [ {" + |
| " \"name\" : \"street\", " + |
| " \"type\" : \"string\", " + |
| " \"default\" : \"NONE\"}, { " + |
| |
| " \"name\" : \"city\", " + |
| " \"type\" : \"string\", " + |
| " \"default\" : \"NONE\"}, " + |
| " ]}, " + |
| " \"default\" : {} " + |
| " } " + |
| "}"; |
| |
| String mySchema = |
| "{" + |
| " \"name\": \"address\", " + |
| " \"type\": \"record\", " + |
| " \"fields\": [ " + |
| " { \"name\": \"name\", \"type\": \"string\"}, " + |
| " { \"name\": \"age\", \"type\": \"int\"}, " + |
| " { " + |
| " \"name\": \"address\", " + |
| " \"type\": { " + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"my_address\", " + |
| " \"fields\" : [ " + |
| " {\"name\": \"street\", \"type\": \"string\"}, " + |
| " {\"name\": \"city\", \"type\": \"string\"} " + |
| " ]} " + |
| " } " + |
| "] " + |
| "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}"; |
| |
| |
| // conversion to GenericData.Record |
| Schema nn = new Schema.Parser().parse(mySchema); |
| GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); |
| |
| try { |
| CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build(); |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| |
| File[] dataFiles = new File(path).listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); |
| } |
| }); |
| Assert.assertNotNull(dataFiles); |
| Assert.assertEquals(1, dataFiles.length); |
| |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| |
| @Test |
| public void testWriteNestedRecordWithMeasure() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| String mySchema = |
| "{" + |
| " \"name\": \"address\", " + |
| " \"type\": \"record\", " + |
| " \"fields\": [ " + |
| " { \"name\": \"name\", \"type\": \"string\"}, " + |
| " { \"name\": \"age\", \"type\": \"int\"}, " + |
| " { " + |
| " \"name\": \"address\", " + |
| " \"type\": { " + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"my_address\", " + |
| " \"fields\" : [ " + |
| " {\"name\": \"street\", \"type\": \"string\"}, " + |
| " {\"name\": \"city\", \"type\": \"string\"} " + |
| " ]} " + |
| " } " + |
| "] " + |
| "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}"; |
| |
| |
| // conversion to GenericData.Record |
| Schema nn = new Schema.Parser().parse(mySchema); |
| GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); |
| |
| try { |
| CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build(); |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| |
| File[] dataFiles = new File(path).listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); |
| } |
| }); |
| Assert.assertNotNull(dataFiles); |
| Assert.assertEquals(1, dataFiles.length); |
| |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| |
| private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns) |
| throws IOException, InvalidLoadOptionException { |
| |
| // conversion to GenericData.Record |
| Schema nn = new Schema.Parser().parse(mySchema); |
| GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); |
| try { |
| CarbonWriter writer = |
| CarbonWriter.builder().outputPath(path).sortBy(sortColumns).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build(); |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| private void WriteAvroComplexDataAndRead(String mySchema) |
| throws IOException, InvalidLoadOptionException, InterruptedException { |
| |
| // conversion to GenericData.Record |
| Schema nn = new Schema.Parser().parse(mySchema); |
| try { |
| CarbonWriter writer = |
| CarbonWriter.builder() |
| .outputPath(path) |
| .withAvroInput(mySchema) |
| .writtenBy("AvroCarbonWriterTest") |
| .build(); |
| int numOfRows = 100000/100; |
| int numOfWrite = 20000; |
| int arrayLength = 300; |
| for (int i = 0; i < numOfRows; i++) { |
| StringBuffer aa1 = new StringBuffer(); |
| StringBuffer bb1 = new StringBuffer(); |
| StringBuffer cc1 = new StringBuffer(); |
| aa1.append("[0.1234567,0.2,-0.3,0.4]"); |
| bb1.append("[0.2123456]"); |
| cc1.append("[0.3123456]"); |
| for (int j = 1; j < arrayLength; j++) { |
| aa1.append(",[1" + i + "" + j + ".1234567,1" + i + "" + j + ".2,-1" + i + "" + j + ".3,1" + i + "" + j + ".4]"); |
| bb1.append(",[2" + i + "" + j + ".2123456,-2" + i + "" + j + ".2]"); |
| cc1.append(",[3" + i + "" + j + ".3123456]"); |
| } |
| String json = "{\"fileName\":\"bob\", \"id\":10, " |
| + " \"aa1\" : [" + aa1 + "], " |
| + "\"bb1\" : [" + bb1 + "], " + |
| "\"cc1\" : [" + cc1 + "]}"; |
| |
| writer.write(json); |
| if (i > 0 && i % numOfWrite == 0) { |
| writer.close(); |
| writer = |
| CarbonWriter.builder() |
| .outputPath(path) |
| .withAvroInput(mySchema) |
| .writtenBy("AvroCarbonWriterTest") |
| .build(); |
| } |
| } |
| writer.close(); |
| String[] projection = new String[nn.getFields().size()]; |
| for (int j = 0; j < nn.getFields().size(); j++) { |
| projection[j] = nn.getFields().get(j).name(); |
| } |
| CarbonReader carbonReader = CarbonReader.builder().projection(projection).withFolder(path).build(); |
| int sum = 0; |
| while (carbonReader.hasNext()) { |
| sum++; |
| Object[] row = (Object[]) carbonReader.readNextRow(); |
| Assert.assertTrue(row.length == 5); |
| Object[] aa1 = (Object[]) row[2]; |
| Assert.assertTrue(aa1.length == arrayLength); |
| Object[] aa2 = (Object[]) aa1[1]; |
| Assert.assertTrue(aa2.length == 4 || aa2.length == 2 || aa2.length == 1); |
| } |
| Assert.assertTrue(sum == numOfRows); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| @Test |
| public void testWriteComplexRecord() throws IOException, InvalidLoadOptionException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| String mySchema = |
| "{" + |
| " \"name\": \"address\", " + |
| " \"type\": \"record\", " + |
| " \"fields\": [ " + |
| " { \"name\": \"name\", \"type\": \"string\"}, " + |
| " { \"name\": \"age\", \"type\": \"int\"}, " + |
| " { " + |
| " \"name\": \"address\", " + |
| " \"type\": { " + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"my_address\", " + |
| " \"fields\" : [ " + |
| " {\"name\": \"street\", \"type\": \"string\"}, " + |
| " {\"name\": \"city\", \"type\": \"string\"} " + |
| " ]} " + |
| " }, " + |
| " {\"name\" :\"doorNum\", " + |
| " \"type\" : { " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"EachdoorNums\", " + |
| " \"type\" : \"int\", " + |
| " \"default\":-1} " + |
| " } " + |
| " }] " + |
| "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, " |
| + " \"doorNum\" : [1,2,3,4]}"; |
| |
| WriteAvroComplexData(mySchema, json, null); |
| |
| File[] dataFiles = new File(path).listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); |
| } |
| }); |
| Assert.assertNotNull(dataFiles); |
| Assert.assertEquals(1, dataFiles.length); |
| |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| |
| @Test |
| public void testWriteComplexRecordWithSortColumns() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| String mySchema = |
| "{" + |
| " \"name\": \"address\", " + |
| " \"type\": \"record\", " + |
| " \"fields\": [ " + |
| " { \"name\": \"name\", \"type\": \"string\"}, " + |
| " { \"name\": \"age\", \"type\": \"int\"}, " + |
| " { " + |
| " \"name\": \"address\", " + |
| " \"type\": { " + |
| " \"type\" : \"record\", " + |
| " \"name\" : \"my_address\", " + |
| " \"fields\" : [ " + |
| " {\"name\": \"street\", \"type\": \"string\"}, " + |
| " {\"name\": \"city\", \"type\": \"string\"} " + |
| " ]} " + |
| " }, " + |
| " {\"name\" :\"doorNum\", " + |
| " \"type\" : { " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"EachdoorNums\", " + |
| " \"type\" : \"int\", " + |
| " \"default\":-1} " + |
| " } " + |
| " }] " + |
| "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, " |
| + " \"doorNum\" : [1,2,3,4]}"; |
| |
| try { |
| WriteAvroComplexData(mySchema, json, new String[] { "doorNum" }); |
| Assert.fail(); |
| } catch (Exception e) { |
| Assert.assertTrue(true); |
| } |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| @Test |
| public void testWriteArrayArrayFloat() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| String mySchema = |
| "{" + |
| " \"name\": \"address\", " + |
| " \"type\": \"record\", " + |
| " \"fields\": [ " + |
| " { \"name\": \"fileName\", \"type\": \"string\"}, " + |
| " { \"name\": \"id\", \"type\": \"int\"}, " + |
| " {\"name\" :\"aa1\", " + |
| " \"type\" : { " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"aa2\", " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"f1\", " + |
| " \"type\" : \"float\", " + |
| " \"default\":-1}}}}," + |
| " {\"name\" :\"bb1\", " + |
| " \"type\" : { " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"bb2\", " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"f2\", " + |
| " \"type\" : \"float\", " + |
| " \"default\":-1}}}}," + |
| " {\"name\" :\"cc1\", " + |
| " \"type\" : { " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"cc2\", " + |
| " \"type\" :\"array\", " + |
| " \"items\":{ " + |
| " \"name\" :\"f3\", " + |
| " \"type\" : \"float\", " + |
| " \"default\":-1}}}}" + |
| "] " + |
| "}"; |
| |
| try { |
| WriteAvroComplexDataAndRead(mySchema); |
| Assert.assertTrue(true); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(); |
| } |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| @Test |
| public void testExceptionForDuplicateColumns() throws IOException, InvalidLoadOptionException { |
| Field[] field = new Field[2]; |
| field[0] = new Field("name", DataTypes.STRING); |
| field[1] = new Field("name", DataTypes.STRING); |
| CarbonWriterBuilder writer = CarbonWriter.builder() |
| .uniqueIdentifier(System.currentTimeMillis()).outputPath(path); |
| |
| try { |
| writer.withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).writtenBy("AvroCarbonWriterTest").build(); |
| Assert.fail(); |
| } catch (Exception e) { |
| assert(e.getMessage().contains("Duplicate column name found in table schema")); |
| } |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| @Test |
| public void testExceptionForInvalidDate() throws IOException, InvalidLoadOptionException { |
| Field[] field = new Field[2]; |
| field[0] = new Field("name", DataTypes.STRING); |
| field[1] = new Field("date", DataTypes.DATE); |
| CarbonWriterBuilder writer = CarbonWriter.builder() |
| .uniqueIdentifier(System.currentTimeMillis()).outputPath(path); |
| |
| try { |
| Map<String, String> loadOptions = new HashMap<String, String>(); |
| loadOptions.put("bad_records_action", "fail"); |
| CarbonWriter carbonWriter = |
| writer.withLoadOptions(loadOptions).withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).writtenBy("AvroCarbonWriterTest").build(); |
| carbonWriter.write(new String[] { "k", "20-02-2233" }); |
| carbonWriter.close(); |
| Assert.fail(); |
| } catch (Exception e) { |
| assert(e.getMessage().contains("Data load failed due to bad record")); |
| } |
| FileUtils.deleteDirectory(new File(path)); |
| } |
| |
| @Test |
| public void testWriteBasicForFloat() throws IOException { |
| FileUtils.deleteDirectory(new File(path)); |
| |
| // Avro schema |
| String avroSchema = |
| "{" + " \"type\" : \"record\"," + " \"name\" : \"Acme\"," + " \"fields\" : [" |
| + "{ \"name\" : \"name\", \"type\" : \"string\" }," |
| + "{ \"name\" : \"age\", \"type\" : \"int\" }," + "{ \"name\" : \"salary\", \"type\" " |
| + ": \"float\" }]" + "}"; |
| |
| String json = "{\"name\":\"bob\", \"age\":10, \"salary\":10.100}"; |
| |
| // conversion to GenericData.Record |
| GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); |
| try { |
| CarbonWriter writer = CarbonWriter.builder().outputPath(path) |
| .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build(); |
| |
| for (int i = 0; i < 100; i++) { |
| writer.write(record); |
| } |
| writer.close(); |
| TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", |
| ""), false); |
| List<String> dataTypes = new ArrayList<>(); |
| for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { |
| dataTypes.add(columnSchema.getDataType().toString()); |
| } |
| assert(dataTypes.contains("FLOAT")); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| } |
| |
| } |