#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

#
#
# For a given pair of attribute sets, compute bivariate statistics between all attribute pairs 
#   Given, index1 = {A_11, A_12, ... A_1m} and index2 = {A_21, A_22, ... A_2n} 
#          compute bivariate stats for m*n pairs (A_1i, A_2j), (1<= i <=m) and (1<= j <=n)
#
# Six inputs:  
#    1) X  - input data
#    2) index1 - First attribute set {A_11, A_12, ... A_1m}
#    3) index2 - Second attribute set {A_21, A_22, ... A_2n}
#    4) types1 - kind for attributes in S1 
#    5) types2 - kind for attributes in S2
#             kind=1 for scale, kind=2 for nominal, kind=3 for ordinal
# 
# One output:    
#    6) output directory in which following (maximum of) four statistics files are created
#        + bivar.scale.scale.stats - matrix containing scale-scale correlations
#        + bivar.nominal.nominal.stats - 
#        + bivar.nominal.scale.stats - 
#        + bivar.ordinal.ordinal.stats - 
#
# hadoop jar SystemML.jar -f bivar-stats.dml -nvargs X=<Data>
#                                                    index1=<Feature Index Set 1>
#                                                    index2=<Feature Index Set 2>
#                                                    types1=<Feature Types 1>
#                                                    types2=<Feature Types 2>
#                                                    OUTDIR=<Output Location>

D = read($X);  # input data set
S1 = read($index1); # attribute set 1
S2 = read($index2); # attribute set 2
K1 = read($types1); # kind for attributes in S1
K2 = read($types2); # kind for attributes in S2

s1size = ncol(S1);
s2size = ncol(S2);
numPairs = s1size * s2size;

#test: 1 is Pearson'R, 2 is F-test, 3 is chi-squared, 4 is Spearman'sRho
# R, (chisq, df, pval, cramersv,) spearman, eta, anovaf, feature_col_index1, feature_col_index2, test

num_scale_scale_tests = 0
num_nominal_nominal_tests = 0
num_ordinal_ordinal_tests = 0
num_nominal_scale_tests = 0

pair2row = matrix(0, rows=numPairs, cols=2)
for( i in 1:s1size, check=0) {
    pre_a1 = as.scalar(S1[1,i]);
    pre_k1 = as.scalar(K1[1,i]);

    for( j in 1:s2size, check=0) {
        pre_pairID = (i-1)*s2size+j; 
        pre_a2 = as.scalar(S2[1,j]);
        pre_k2 = as.scalar(K2[1,j]);
	
	if (pre_k1 == pre_k2) {
            if (pre_k1 == 1) {
	        	num_scale_scale_tests = num_scale_scale_tests + 1
				pair2row[pre_pairID,1] = num_scale_scale_tests
            } else {
	      		num_nominal_nominal_tests = num_nominal_nominal_tests + 1
				pair2row[pre_pairID,1] = num_nominal_nominal_tests
		
                if ( pre_k1 == 3 ) {
		    		num_ordinal_ordinal_tests = num_ordinal_ordinal_tests + 1
		    		pair2row[pre_pairID, 2] = num_ordinal_ordinal_tests
                }
            }
        }
        else {
            if (pre_k1 == 1 | pre_k2 == 1) {
	        	num_nominal_scale_tests = num_nominal_scale_tests + 1
				pair2row[pre_pairID,1] = num_nominal_scale_tests
            } else {
	        	num_nominal_nominal_tests = num_nominal_nominal_tests + 1
				pair2row[pre_pairID,1] = num_nominal_nominal_tests 
            }
		}
    }
}

size_scale_scale_tests     = max(num_scale_scale_tests, 1);
size_nominal_nominal_tests = max(num_nominal_nominal_tests, 1)
size_ordinal_ordinal_tests = max(num_ordinal_ordinal_tests, 1);
size_nominal_scale_tests   = max(num_nominal_scale_tests, 1);

basestats                 = matrix(0, rows=11, cols=numPairs);
basestats_scale_scale     = matrix(0, rows=6, cols=size_scale_scale_tests)
basestats_nominal_nominal = matrix(0, rows=6, cols=size_nominal_nominal_tests)
basestats_ordinal_ordinal = matrix(0, rows=3, cols=size_ordinal_ordinal_tests)
basestats_nominal_scale   = matrix(0, rows=11, cols=size_nominal_scale_tests)


# Compute max domain size among all categorical attributes
# and check if these cols have been recoded

debug_str = "Stopping execution of DML script due to invalid input";

error_flag = FALSE;

maxs = colMaxs(D);
mins = colMins(D)
maxDomainSize = -1.0;
for(k in 1:ncol(K1) ) {
  type = as.scalar(K1[1,k]);
  
  if ( type > 1) {
    colID = as.scalar(S1[1,k]);
    
    colMaximum = as.scalar(maxs[1,colID]);
    if(maxDomainSize < colMaximum) maxDomainSize = colMaximum;
  
  	colMinimum = as.scalar(mins[1,colID]);
  	if(colMinimum < 1){
  	  if(type == 2)
  	    debug_str = append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum)
  	  else
  	    debug_str = append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum)
  	  error_flag = TRUE;
  	}
  }
}

for(k in 1:ncol(K2) ) {
  type = as.scalar(K2[1,k]);
  
  if ( type > 1) {
    colID = as.scalar(S2[1,k]);
    
    colMaximum = as.scalar(maxs[1,colID]);
    if(maxDomainSize < colMaximum) maxDomainSize = colMaximum;
  
  	colMinimum = as.scalar(mins[1,colID]);
  	if(colMinimum < 1){
  	  if(type == 2)
  	    debug_str = append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum)
  	  else 
  	  	debug_str = append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum)
  	  error_flag = TRUE;
  	}
  }
}
maxDomain = as.integer(maxDomainSize);

if(error_flag) stop(debug_str);

parfor( i in 1:s1size, check=0) {
    a1 = as.scalar(S1[1,i]);
    k1 = as.scalar(K1[1,i]);
    A1 = D[,a1];

    parfor( j in 1:s2size, check=0) {
        pairID = (i-1)*s2size+j; 
        a2 = as.scalar(S2[1,j]);
        k2 = as.scalar(K2[1,j]);
        A2 = D[,a2];

		rowid1 = as.scalar(pair2row[pairID, 1])
    	rowid2 = as.scalar(pair2row[pairID, 2])

        if (k1 == k2) {
            if (k1 == 1) {
                # scale-scale
                print("[" + i + "," + j + "] scale-scale");
                [r, cov, sigma1, sigma2] = bivar_ss(A1,A2);   
		
    			basestats_scale_scale[1,rowid1] = a1;
				basestats_scale_scale[2,rowid1] = a2;	
                basestats_scale_scale[3,rowid1] = r;
                basestats_scale_scale[4,rowid1] = cov;
                basestats_scale_scale[5,rowid1] = sigma1;
                basestats_scale_scale[6,rowid1] = sigma2;
            } else {
                # nominal-nominal or ordinal-ordinal
                print("[" + i + "," + j + "] categorical-categorical");
                [chisq, df, pval, cramersv]  = bivar_cc(A1, A2, maxDomain);

                basestats_nominal_nominal[1,rowid1] = a1;
				basestats_nominal_nominal[2,rowid1] = a2;	
                basestats_nominal_nominal[3,rowid1] = chisq;
                basestats_nominal_nominal[4,rowid1] = df;
                basestats_nominal_nominal[5,rowid1] = pval;
                basestats_nominal_nominal[6,rowid1] = cramersv;

                if ( k1 == 3 ) {
                    # ordinal-ordinal
                    print("[" + i + "," + j + "] ordinal-ordinal");
                    sp = bivar_oo(A1, A2, maxDomain);

                    basestats_ordinal_ordinal[1,rowid2] = a1;
                    basestats_ordinal_ordinal[2,rowid2] = a2;
                    basestats_ordinal_ordinal[3,rowid2] = sp;
                }
            }
        } else {
            if (k1 == 1 | k2 == 1) {
                # Scale-nominal/ordinal     
                print("[" + i + "," + j + "] scale-categorical");
                
               	if ( k1 == 1 ) {
                	[eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A1, A2, maxDomain);
                } else {
                    [eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A2, A1, maxDomain);
                }
		
                basestats_nominal_scale[1,rowid1] = a1;
                basestats_nominal_scale[2,rowid1] = a2;
                basestats_nominal_scale[3,rowid1] = eta;
                basestats_nominal_scale[4,rowid1] = f;
                basestats_nominal_scale[5,rowid1] = pval;
                basestats_nominal_scale[6,rowid1] = bw_ss;
                basestats_nominal_scale[7,rowid1] = within_ss;
                basestats_nominal_scale[8,rowid1] = bw_df;
                basestats_nominal_scale[9,rowid1] = within_df;
                basestats_nominal_scale[10,rowid1] = bw_mean_square;
                basestats_nominal_scale[11,rowid1] = within_mean_square;
            } else {
                # nominal-ordinal or ordinal-nominal
                print("[" + i + "," + j + "] categorical-categorical");
                [chisq, df, pval, cramersv]  = bivar_cc(A1, A2, maxDomain);

				basestats_nominal_nominal[1,rowid1] = a1;
				basestats_nominal_nominal[2,rowid1] = a2;
                basestats_nominal_nominal[3,rowid1] = chisq;
                basestats_nominal_nominal[4,rowid1] = df;
                basestats_nominal_nominal[5,rowid1] = pval;
                basestats_nominal_nominal[6,rowid1] = cramersv;
            }
        }
    }
}

if(num_scale_scale_tests == size_scale_scale_tests){
  write(basestats_scale_scale, $OUTDIR + "/bivar.scale.scale.stats");
}

if(num_nominal_scale_tests == size_nominal_scale_tests){
  write(basestats_nominal_scale, $OUTDIR + "/bivar.nominal.scale.stats");
}

if(num_nominal_nominal_tests == size_nominal_nominal_tests){
  write(basestats_nominal_nominal, $OUTDIR + "/bivar.nominal.nominal.stats");
}

if(num_ordinal_ordinal_tests == size_ordinal_ordinal_tests){
  write(basestats_ordinal_ordinal, $OUTDIR + "/bivar.ordinal.ordinal.stats");
}

# -----------------------------------------------------------------------------------------------------------

bivar_cc = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain) return (Double chisq, Double df, Double pval, Double cramersv) {

    # Contingency Table
    F = table(A, B, maxDomain, maxDomain);
    F = F[1:max(A), 1:max(B)];

    # Chi-Squared
    W = sum(F);
    r = rowSums(F);
    c = colSums(F);
    E = (r %*% c)/W;
    T = (F-E)^2/E;
    chi_squared = sum(T);

    # compute p-value
    degFreedom = (nrow(F)-1)*(ncol(F)-1);
    pValue = pchisq(target=chi_squared, df=degFreedom, lower.tail=FALSE);

    # Cramer's V
    R = nrow(F);
    C = ncol(F);
    q = min(R,C);
    cramers_v = sqrt(chi_squared/(W*(q-1)));

    # Assign return values
    chisq = chi_squared;
    df = as.double(degFreedom);
    pval = pValue;
    cramersv = cramers_v;
}

# -----------------------------------------------------------------------------------------------------------

bivar_ss = function(Matrix[Double] X, Matrix[Double] Y) return (Double R, Double covXY, Double sigmaX, Double sigmaY) {

    # Unweighted co-variance
    covXY = cov(X,Y);

    # compute standard deviations for both X and Y by computing 2^nd central moment
    W = nrow(X);
    m2X = moment(X,2);
    m2Y = moment(Y,2);
    sigmaX = sqrt(m2X * (W/(W-1.0)) );
    sigmaY = sqrt(m2Y * (W/(W-1.0)) );

    # Pearson's R
    R = covXY / (sigmaX*sigmaY);
}

# -----------------------------------------------------------------------------------------------------------

# Y points to SCALE variable
# A points to CATEGORICAL variable
bivar_sc = function(Matrix[Double] Y, Matrix[Double] A, Double maxDomain) 
		   return (Double Eta, Double AnovaF, Double pval, Double bw_ss, Double within_ss, Double bw_df, Double within_df, Double bw_mean_square, Double within_mean_square) {

    # mean and variance in target variable
    W = nrow(A);
    my = mean(Y);
    varY = moment(Y,2) * W/(W-1.0)

    # category-wise (frequencies, means, variances)
    CFreqs = aggregate(target=Y, groups=A, fn="count", ngroups=maxDomain); 
    CMeans = aggregate(target=Y, groups=A, fn="mean", ngroups=maxDomain);
    CVars =  aggregate(target=Y, groups=A, fn="variance", ngroups=maxDomain);
    
    # number of categories
    R = nrow(CFreqs);

    Eta = sqrt(1 - ( sum((CFreqs-1)*CVars) / ((W-1)*varY) ));

    bw_ss = sum( (CFreqs*(CMeans-my)^2) );
    bw_df = as.double(R-1);
    bw_mean_square = bw_ss/bw_df;
	
    within_ss = sum( (CFreqs-1)*CVars );
    within_df = as.double(W-R);
    within_mean_square = within_ss/within_df;
	
    AnovaF = bw_mean_square/within_mean_square;
    
    pval = pf(target=AnovaF, df1=bw_df, df2=within_df, lower.tail=FALSE)
}


# -----------------------------------------------------------------------------------------------------------
# Function to compute ranks
# takes a column vector as input, and produces a vector of same size in which each cell denotes to the computed score for that category
computeRanks = function(Matrix[Double] X) return (Matrix[Double] Ranks) {
    Ranks = cumsum(X) - X/2 + 1/2;
}

#-------------------------------------------------------------------------

bivar_oo = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain) return (Double sp) {

    # compute contingency table
    F = table(A, B, maxDomain, maxDomain);
    F = F[1:max(A), 1:max(B)];
    
    catA = nrow(F);  # number of categories in A
    catB = ncol(F);  # number of categories in B

    # compute category-wise counts for both the attributes
    R = rowSums(F);
    S = colSums(F);

    # compute scores, both are column vectors
    [C] = computeRanks(R);
    meanX = mean(C,R); 

    columnS = t(S);
    [D] = computeRanks(columnS);

    # scores (C,D) are individual values, and counts (R,S) act as weights
    meanY = mean(D,columnS);

    W = sum(F); # total weight, or total #cases
    varX = moment(C,R,2)*(W/(W-1.0));
    varY = moment(D,columnS,2)*(W/(W-1.0));
    covXY = sum( t(F/(W-1) * (C-meanX)) * (D-meanY) );

    sp = covXY/(sqrt(varX)*sqrt(varY));
}

# -----------------------------------------------------------------------------------------------------------
