blob: 4797c7fe8b8f568464f3cf58a75e7d2e4877ac08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.iogen;
import org.apache.sysds.common.Types;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.UtilFunctions;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class ReaderMappingIndex {
private int[][] mapRow;
private int[][] mapCol;
private int[][] mapLen;
private MappingProperties mappingProperties;
private final int nrows;
private final int ncols;
private int nlines;
private int actualValueCount;
private RawIndex[] sampleRawIndexes;
private MatrixBlock sampleMatrix;
private FrameBlock sampleFrame;
private Types.ValueType[] schema;
private final boolean isMatrix;
public ReaderMappingIndex(int nlines, int nrows, int ncols, RawIndex[] sampleRawIndexes, MatrixBlock matrix) throws Exception {
this.nlines = nlines;
this.nrows = nrows;
this.ncols = ncols;
this.sampleRawIndexes = sampleRawIndexes;
this.sampleMatrix = matrix;
this.isMatrix = true;
//this.runMapping(true);
}
public ReaderMappingIndex(String raw, MatrixBlock matrix) throws Exception {
this.ReadRaw(raw);
this.isMatrix = true;
this.sampleMatrix = matrix;
this.nrows = this.sampleMatrix.getNumRows();
this.ncols = this.sampleMatrix.getNumColumns();
//this.runMapping(false);
}
public ReaderMappingIndex(String raw, FrameBlock frame) throws Exception {
this.ReadRaw(raw);
this.isMatrix = false;
this.sampleFrame = frame;
this.nrows = this.sampleFrame.getNumRows();
this.ncols = this.sampleFrame.getNumColumns();
this.schema = this.sampleFrame.getSchema();
//this.runMapping(false);
}
private void ReadRaw(String raw) throws Exception {
InputStream is = IOUtilFunctions.toInputStream(raw);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String value;
ArrayList<String> rawList = new ArrayList<>();
while((value = br.readLine()) != null) {
rawList.add(value);
}
this.nlines = rawList.size();
this.sampleRawIndexes = new RawIndex[this.nlines];
int numThreads = OptimizerUtils.getParallelTextWriteParallelism();
try {
ExecutorService pool = CommonThreadPool.get(numThreads);
ArrayList<BuildRawIndexTask> tasks = new ArrayList<>();
int blklen = (int) Math.ceil((double) nlines / numThreads);
for(int i = 0; i < numThreads; i++) {
int beginIndex = i * blklen;
int endIndex = Math.min((i + 1) * blklen, nlines);
tasks.add(new BuildRawIndexTask(rawList, this.sampleRawIndexes, beginIndex, endIndex));
}
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
pool.shutdown();
//check for exceptions
for(Future<Object> task : rt)
task.get();
}
catch(Exception e) {
throw new RuntimeException("Failed parallel ReadRaw.", e);
}
}
@SuppressWarnings("unused")
private void runMapping(boolean isIndexMapping) {
this.mapRow = new int[nrows][ncols];
this.mapCol = new int[nrows][ncols];
this.mapLen = new int[nrows][ncols];
this.mappingProperties = new MappingProperties();
// Set "-1" as default value for all defined matrix
for(int r = 0; r < nrows; r++)
for(int c = 0; c < ncols; c++)
this.mapRow[r][c] = this.mapCol[r][c] = this.mapLen[r][c] = -1;
int itRow = 0;
for(int r = 0; r < nrows; r++) {
for(int c = 0; c < ncols; c++) {
if(isIndexMapping || checkValueIsNotNullZero(r, c)) {
HashSet<Integer> checkedLines = new HashSet<>();
while(checkedLines.size() < nlines) {
RawIndex ri = this.sampleRawIndexes[itRow];
Pair<Integer, Integer> pair = this.isMatrix ? ri.findValue(this.sampleMatrix.getValue(r, c)) : ri.findValue(this.sampleFrame.get(r, c), this.schema[c]);
if(pair != null) {
this.mapRow[r][c] = itRow;
this.mapCol[r][c] = pair.getKey();
this.mapLen[r][c] = pair.getValue();
break;
}
else {
checkedLines.add(itRow);
itRow++;
if(itRow == nlines)
itRow = 0;
}
}
}
}
}
// analysis mapping of values
// 1. check (exist, partially exist, not exist)
actualValueCount = 0;
int mappedValueCount = 0;
for(int r = 0; r < nrows; r++) {
for(int c = 0; c < ncols; c++) {
if(checkValueIsNotNullZero(r, c)) {
actualValueCount++;
if(this.mapRow[r][c] != -1)
mappedValueCount++;
}
}
}
if(actualValueCount == mappedValueCount) {
this.mappingProperties.setTypicalRepresentation();
this.mappingProperties.setDataFullExist();
}
else if(actualValueCount > 0 && mappedValueCount == 0)
this.mappingProperties.setDataNotExist();
else if(mappedValueCount > 0 && mappedValueCount < actualValueCount)
this.mappingProperties.setDataPartiallyExist();
// 2. check the records represented in single/multilines
boolean singleLine = true;
// first mapped value
int[] firstLineNumbers = new int[nrows];
for(int r = 0; r < nrows; r++) {
int c = 0;
firstLineNumbers[r] = -1;
for(; c < ncols && firstLineNumbers[r] == -1; c++)
firstLineNumbers[r] = mapRow[r][c];
// other mapped
for(; c < ncols && singleLine; c++)
if(mapRow[r][c] != -1)
singleLine = firstLineNumbers[r] == mapRow[r][c];
}
for(int r=0; r<nrows-1 && singleLine; r++)
singleLine = firstLineNumbers[r+1] - firstLineNumbers[r] == 1;
if(singleLine) {
mappingProperties.setRecordSingleLine();
// 3.a check for array representation
boolean allValuesInALine = true;
for(int r=0; r<nrows && allValuesInALine; r++){
for(int c=0; c<ncols; c++){
if(mapRow[r][c] != -1 && mapRow[r][c] != firstLineNumbers[0]) {
allValuesInALine = false;
break;
}
}
}
// when all values are continuously are in a single line it is an Array representation
if(allValuesInALine){
// check the Array is in Row or Col wise
int t = 0;
for(int c = 0; c<ncols; c++){
for(int r=0; r<nrows-1; r++){
if(mapCol[r][c] != -1)
continue;
if(mapCol[r][c] > mapCol[r+1][c])
t++;
}
}
if((float)t/actualValueCount <0.03)
this.mappingProperties.setArrayRowWiseRepresentation();
else
this.mappingProperties.setArrayColWiseRepresentation();
}
}
else {
mappingProperties.setRecordMultiLine();
// 3.a check for array representation
// TODO: array properties for multi-line
}
// 3.b check the Typical, Symmetric, Skew-Symmetric, Pattern, and Array
// check for upper and lower triangular
if(nrows == ncols && !this.mappingProperties.isRepresentation()) {
boolean symmetricMap = true;
// Symmetric check
for(int r = 0; r < nrows && symmetricMap; r++) {
for(int c = 0; c <= r && symmetricMap; c++)
symmetricMap = this.checkSymmetricValue(r, c, 1);
}
if(symmetricMap)
mappingProperties.setSymmetricRepresentation();
// Skew-Symmetric check
if(!mappingProperties.isRepresentation()) {
boolean skewSymmetricMap = true;
for(int r = 0; r < nrows && skewSymmetricMap; r++) {
for(int c = 0; c <= r && skewSymmetricMap; c++)
skewSymmetricMap = this.checkSymmetricValue(r, c, -1);
}
if(skewSymmetricMap)
mappingProperties.setSkewSymmetricRepresentation();
}
// Pattern check
if(!mappingProperties.isRepresentation()) {
boolean patternMap = false;
Object patternValueMap = null;
// pattern check for Frame: in Frame the schema must be same for all columns
boolean homoSchema = true;
Types.ValueType vtc0 = null;
if(!this.isMatrix) {
vtc0 = this.sampleFrame.getSchema()[0];
for(int c = 1; c < ncols && homoSchema; c++)
homoSchema = this.sampleFrame.getSchema()[c] == vtc0;
}
// pattern check for Matrix representation
for(int r = 0; r < nrows; r++) {
if(this.isMatrix) {
HashSet<Double> patternValueSet = new HashSet<>();
for(int c = 0; c < ncols; c++)
patternValueSet.add(this.sampleMatrix.getValue(r, c));
if(patternValueSet.size() == 1) {
vtc0 = Types.ValueType.FP64;
patternMap = true;
patternValueMap = patternValueSet.iterator().next();
}
}
else { // pattern check for Frame representation
if(homoSchema) {
HashSet<Object> patternValueSet = new HashSet<>();
for(int c = 0; c < ncols; c++)
patternValueSet.add(this.sampleFrame.get(r, c));
if(patternValueSet.size() == 1) {
patternMap = true;
patternValueMap = patternValueSet.iterator().next();
}
}
}
}
if(patternMap)
mappingProperties.setPatternRepresentation(vtc0, patternValueMap);
}
}
}
private boolean checkValueIsNotNullZero(int r, int c) {
boolean result;
if(this.isMatrix)
result = this.sampleMatrix.getValue(r, c) != 0;
else {
if(this.sampleFrame.getSchema()[c].isNumeric())
result = this.sampleFrame.getDouble(r, c) != 0;
else
result = this.sampleFrame.get(r, c) != null;
}
return result;
}
// Symmetric checks just available for numeric values in the frame representations
private boolean checkSymmetricValue(int r, int c, int a) {
boolean result;
if(this.isMatrix)
result = this.sampleMatrix.getValue(r, c) == this.sampleMatrix.getValue(c, r) * a;
else if(this.sampleFrame.getSchema()[c].isNumeric())
result = this.sampleFrame.getDouble(r, c) == this.sampleFrame.getDouble(c, r) * a;
else
result = this.sampleFrame.get(r, c).equals(this.sampleFrame.get(c, r));
return result;
}
public int[][] getMapRow() {
return mapRow;
}
public int[][] getMapCol() {
return mapCol;
}
public int[][] getMapLen() {
return mapLen;
}
public RawIndex[] getSampleRawIndexes() {
return sampleRawIndexes;
}
public int getNrows() {
return nrows;
}
public int getNcols() {
return ncols;
}
public int getNlines() {
return nlines;
}
public MappingProperties getMappingProperties() {
return mappingProperties;
}
public int getActualValueCount() {
return actualValueCount;
}
public boolean compareCellValue (int r, int c, String value){
if(isMatrix)
try {
return sampleMatrix.getValue(r, c) == UtilFunctions.objectToDouble(Types.ValueType.FP64, value);
}
catch(Exception exception){
return false;
}
else
try {
return sampleFrame.get(r,c).equals(UtilFunctions.stringToObject(sampleFrame.getColumnType(c), value));
}
catch(Exception exception){
return false;
}
}
private class BuildRawIndexTask implements Callable<Object> {
private final ArrayList<String> rawList;
private RawIndex[] sampleRawIndex;
private final int beginIndex;
private final int endIndex;
public BuildRawIndexTask(ArrayList<String> rawList, RawIndex[] sampleRawIndex, int beginIndex, int endIndex) {
this.rawList = rawList;
this.sampleRawIndex = sampleRawIndex;
this.beginIndex = beginIndex;
this.endIndex = endIndex;
}
@Override
public Object call() throws Exception {
for(int i=this.beginIndex; i<this.endIndex; i++)
this.sampleRawIndex[i] = new RawIndex(this.rawList.get(i));
return null;
}
}
}