blob: 062efe6b9e3a13358d3fa4d10c92bc34a268cbde [file] [log] [blame]
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------
import os
from typing import Dict, Optional, Sequence, Tuple, Union
import numpy as np
from py4j.java_gateway import JavaObject, JVMView
from systemds.context import SystemDSContext
from systemds.operator import OperationNode
from systemds.utils.consts import VALID_INPUT_TYPES
from systemds.utils.converters import numpy_to_matrix_block
# TODO maybe instead of having a new class we could have a function `matrix` instead, adding behavior to
# `OperationNode` would be necessary
class Matrix(OperationNode):
_np_array: Optional[np.array]
def __init__(self, sds_context: 'SystemDSContext', mat: Union[np.array, os.PathLike],
*args: Sequence[VALID_INPUT_TYPES],
**kwargs: Dict[str, VALID_INPUT_TYPES]) -> None:
"""Generate DAGNode representing matrix with data either given by a numpy array, which will be sent to SystemDS
on need, or a path pointing to a matrix.
:param mat: the numpy array or path to matrix file
:param args: unnamed parameters
:param kwargs: named parameters
"""
if isinstance(mat, str):
unnamed_params = [f'\'{mat}\'']
named_params = {}
self._np_array = None
else:
# TODO better alternative than format string?
unnamed_params = ['\'./tmp/{file_name}\'']
named_params = {'rows': -1, 'cols': -1}
self._np_array = mat
unnamed_params.extend(args)
named_params.update(kwargs)
super().__init__(sds_context, 'read', unnamed_params,
named_params, is_python_local_data=self._is_numpy())
def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!'
if self._is_numpy():
prepared_script.setMatrix(var_name, numpy_to_matrix_block(
jvm, self._np_array), True) # True for reuse
def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
named_input_vars: Dict[str, str]) -> str:
code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars)
if self._is_numpy():
code_line = code_line.format(file_name=var_name)
return code_line
def compute(self, verbose: bool = False, lineage: bool = False) -> Union[np.array]:
if self._is_numpy():
if verbose:
print('[Numpy Array - No Compilation necessary]')
return self._np_array
else:
return super().compute(verbose, lineage)
def _is_numpy(self) -> bool:
return self._np_array is not None
def rev(self) -> OperationNode:
""" Reverses the rows in a matrix
:return: the OperationNode representing this operation
"""
self._is_numpy()
return OperationNode(self.sds_context, 'rev', [self])
def order(self, by: int = 1, decreasing: bool = False,
index_return: bool = False) -> OperationNode:
""" Sort by a column of the matrix X in increasing/decreasing order and returns either the index or data
:param by: sort matrix by this column number
:param decreasing: If true the matrix will be sorted in decreasing order
:param index_return: If true, the index numbers will be returned
:return: the OperationNode representing this operation
"""
self._is_numpy()
cols = self._np_array.shape[1]
if by > cols:
raise IndexError("Index {i} is out of bounds for axis 1 with size {c}".format(i=by, c=cols))
named_input_nodes = {'target': self, 'by': by, 'decreasing': str(decreasing).upper(),
'index.return': str(index_return).upper()}
return OperationNode(self.sds_context, 'order', [], named_input_nodes=named_input_nodes)
def t(self) -> OperationNode:
""" Transposes the input matrix
:return: the OperationNode representing this operation
"""
self._is_numpy()
return OperationNode(self.sds_context, 't', [self])
def cholesky(self, safe: bool = False) -> OperationNode:
""" Computes the Cholesky decomposition of a symmetric, positive definite matrix
:param safe: default value is False, if flag is True additional checks to ensure
that the matrix is symmetric positive definite are applied, if False, checks will be skipped
:return: the OperationNode representing this operation
"""
self._is_numpy()
# check square dimension
if self._np_array.shape[0] != self._np_array.shape[1]:
raise ValueError("Last 2 dimensions of the array must be square")
if safe:
# check if mat is positive definite
if not np.all(np.linalg.eigvals(self._np_array) > 0):
raise ValueError("Matrix is not positive definite")
# check if mat is symmetric
if not np.allclose(self._np_array, self._np_array.transpose()):
raise ValueError("Matrix is not symmetric")
return OperationNode(self.sds_context, 'cholesky', [self])