| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.transform.encode; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.wink.json4j.JSONException; |
| import org.apache.wink.json4j.JSONObject; |
| import org.apache.sysds.lops.Lop; |
| import org.apache.sysds.runtime.matrix.data.FrameBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.transform.TfUtils.TfMethod; |
| import org.apache.sysds.runtime.transform.meta.TfMetaUtils; |
| |
| public class EncoderRecode extends Encoder |
| { |
| private static final long serialVersionUID = 8213163881283341874L; |
| |
| //recode maps and custom map for partial recode maps |
| private HashMap<Integer, HashMap<String, Long>> _rcdMaps = new HashMap<>(); |
| private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null; |
| |
| public EncoderRecode(JSONObject parsedSpec, String[] colnames, int clen, int minCol, int maxCol) |
| throws JSONException |
| { |
| super(null, clen); |
| _colList = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfMethod.RECODE.toString(), minCol, maxCol); |
| } |
| |
| private EncoderRecode(int[] colList, int clen) { |
| super(colList, clen); |
| } |
| |
| public EncoderRecode() { |
| this(new int[0], 0); |
| } |
| |
| private EncoderRecode(int[] colList, int clen, HashMap<Integer, HashMap<String, Long>> rcdMaps) { |
| super(colList, clen); |
| _rcdMaps = rcdMaps; |
| } |
| |
| public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { |
| return _rcdMaps; |
| } |
| |
| public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() { |
| return _rcdMapsPart; |
| } |
| |
| private long lookupRCDMap(int colID, String key) { |
| if( !_rcdMaps.containsKey(colID) ) |
| return -1; //empty recode map |
| Long tmp = _rcdMaps.get(colID).get(key); |
| return (tmp!=null) ? tmp : -1; |
| } |
| |
| @Override |
| public MatrixBlock encode(FrameBlock in, MatrixBlock out) { |
| if( !isApplicable() ) |
| return out; |
| |
| //build and apply recode maps |
| build(in); |
| apply(in, out); |
| |
| return out; |
| } |
| |
| @Override |
| public void build(FrameBlock in) { |
| if( !isApplicable() ) |
| return; |
| |
| Iterator<String[]> iter = in.getStringRowIterator(_colList); |
| while( iter.hasNext() ) { |
| String[] row = iter.next(); |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; //1-based |
| //allocate column map if necessary |
| if( !_rcdMaps.containsKey(colID) ) |
| _rcdMaps.put(colID, new HashMap<String,Long>()); |
| //probe and build column map |
| HashMap<String,Long> map = _rcdMaps.get(colID); |
| String key = row[j]; |
| if( key!=null && !key.isEmpty() && !map.containsKey(key) ) |
| putCode(map, key); |
| } |
| } |
| } |
| |
| /** |
| * Put the code into the map with the provided key. The code depends on the type of encoder. |
| * @param map column map |
| * @param key key for the new entry |
| */ |
| protected void putCode(HashMap<String,Long> map, String key) { |
| map.put(key, Long.valueOf(map.size()+1)); |
| } |
| |
| public void buildPartial(FrameBlock in) { |
| if( !isApplicable() ) |
| return; |
| |
| //ensure allocated partial recode map |
| if( _rcdMapsPart == null ) |
| _rcdMapsPart = new HashMap<>(); |
| |
| //construct partial recode map (tokens w/o codes) |
| //iterate over columns for sequential access |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; //1-based |
| //allocate column map if necessary |
| if( !_rcdMapsPart.containsKey(colID) ) |
| _rcdMapsPart.put(colID, new HashSet<>()); |
| HashSet<Object> map = _rcdMapsPart.get(colID); |
| //probe and build column map |
| for( int i=0; i<in.getNumRows(); i++ ) |
| map.add(in.get(i, colID-1)); |
| //cleanup unnecessary entries once |
| map.remove(null); |
| map.remove(""); |
| } |
| } |
| |
| @Override |
| public MatrixBlock apply(FrameBlock in, MatrixBlock out) { |
| //apply recode maps column wise |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; |
| for( int i=0; i<in.getNumRows(); i++ ) { |
| Object okey = in.get(i, colID-1); |
| String key = (okey!=null) ? okey.toString() : null; |
| long code = lookupRCDMap(colID, key); |
| out.quickSetValue(i, colID-1, |
| (code >= 0) ? code : Double.NaN); |
| } |
| } |
| |
| return out; |
| } |
| |
| @Override |
| public Encoder subRangeEncoder(int colStart, int colEnd) { |
| List<Integer> cols = new ArrayList<>(); |
| HashMap<Integer, HashMap<String, Long>> rcdMaps = new HashMap<>(); |
| for (int col : _colList) { |
| if (col >= colStart && col < colEnd) { |
| // add the correct column, removed columns before start |
| // colStart - 1 because colStart is 1-based |
| int corrColumn = col - (colStart - 1); |
| cols.add(corrColumn); |
| // copy rcdMap for column |
| rcdMaps.put(corrColumn, new HashMap<>(_rcdMaps.get(col))); |
| } |
| } |
| if (cols.isEmpty()) |
| // empty encoder -> sub range encoder does not exist |
| return null; |
| |
| int[] colList = cols.stream().mapToInt(i -> i).toArray(); |
| return new EncoderRecode(colList, colEnd - colStart, rcdMaps); |
| } |
| |
| @Override |
| public void mergeAt(Encoder other, int col) { |
| if(other instanceof EncoderRecode) { |
| mergeColumnInfo(other, col); |
| |
| // merge together overlapping columns or add new columns |
| EncoderRecode otherRec = (EncoderRecode) other; |
| for (int otherColID : other._colList) { |
| int colID = otherColID + col - 1; |
| //allocate column map if necessary |
| if( !_rcdMaps.containsKey(colID) ) |
| _rcdMaps.put(colID, new HashMap<>()); |
| |
| HashMap<String, Long> otherMap = otherRec._rcdMaps.get(otherColID); |
| if(otherMap != null) { |
| // for each column, add all non present recode values |
| for(Map.Entry<String, Long> entry : otherMap.entrySet()) { |
| if (lookupRCDMap(colID, entry.getKey()) == -1) { |
| // key does not yet exist |
| putCode(_rcdMaps.get(colID), entry.getKey()); |
| } |
| } |
| } |
| } |
| return; |
| } |
| super.mergeAt(other, col); |
| } |
| |
| public int[] numDistinctValues() { |
| int[] numDistinct = new int[_colList.length]; |
| |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; //1-based |
| numDistinct[j] = _rcdMaps.get(colID).size(); |
| } |
| return numDistinct; |
| } |
| |
| @Override |
| public FrameBlock getMetaData(FrameBlock meta) { |
| if( !isApplicable() ) |
| return meta; |
| |
| //inverse operation to initRecodeMaps |
| |
| //allocate output rows |
| int maxDistinct = 0; |
| for( int j=0; j<_colList.length; j++ ) |
| if( _rcdMaps.containsKey(_colList[j]) ) |
| maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size()); |
| meta.ensureAllocatedColumns(maxDistinct); |
| |
| //create compact meta data representation |
| StringBuilder sb = new StringBuilder(); //for reuse |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; //1-based |
| int rowID = 0; |
| if( _rcdMaps.containsKey(_colList[j]) ) |
| for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) { |
| meta.set(rowID++, colID-1, |
| constructRecodeMapEntry(e.getKey(), e.getValue(), sb)); |
| } |
| meta.getColumnMetadata(colID-1).setNumDistinct( |
| _rcdMaps.get(colID).size()); |
| } |
| |
| return meta; |
| } |
| |
| |
| /** |
| * Construct the recodemaps from the given input frame for all |
| * columns registered for recode. |
| * |
| * @param meta frame block |
| */ |
| @Override |
| public void initMetaData( FrameBlock meta ) { |
| if( meta == null || meta.getNumRows()<=0 ) |
| return; |
| |
| for( int j=0; j<_colList.length; j++ ) { |
| int colID = _colList[j]; //1-based |
| _rcdMaps.put(colID, meta.getRecodeMap(colID-1)); |
| } |
| } |
| |
| /** |
| * Returns the Recode map entry which consists of concatenation of code, delimiter and token. |
| * |
| * @param token is part of Recode map |
| * @param code is code for token |
| * @return the concatenation of token and code with delimiter in between |
| */ |
| public static String constructRecodeMapEntry(String token, Long code) { |
| StringBuilder sb = new StringBuilder(token.length()+16); |
| return constructRecodeMapEntry(token, code, sb); |
| } |
| |
| private static String constructRecodeMapEntry(String token, Long code, StringBuilder sb) { |
| sb.setLength(0); //reset reused string builder |
| return sb.append(token).append(Lop.DATATYPE_PREFIX) |
| .append(code.longValue()).toString(); |
| } |
| |
| /** |
| * Splits a Recode map entry into its token and code. |
| * |
| * @param value concatenation of token and code with delimiter in between |
| * @return string array of token and code |
| */ |
| public static String[] splitRecodeMapEntry(String value) { |
| // Instead of using splitCSV which is forcing string with RFC-4180 format, |
| // using Lop.DATATYPE_PREFIX separator to split token and code |
| int pos = value.toString().lastIndexOf(Lop.DATATYPE_PREFIX); |
| return new String[] {value.substring(0, pos), value.substring(pos+1)}; |
| } |
| } |