[SYSTEMDS-2677] Allow Reading unknown dimensions for algorithm input
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index d80fdfa..5160c2b 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -39,6 +39,7 @@
from systemds.operator import OperationNode
from systemds.script_building import OutputType
+
class SystemDSContext(object):
"""A context with a connection to a java instance with which SystemDS operations are executed.
The java process is started and is running using a random tcp port for instruction parsing."""
@@ -274,8 +275,21 @@
return OperationNode(self, 'rand', [], named_input_nodes=named_input_nodes)
- def read(self, path: os.PathLike, **kwargs: Dict[str, VALID_INPUT_TYPES]):
- return OperationNode(self, 'read', [f'"{path}"'], named_input_nodes=kwargs)
+ def read(self, path: os.PathLike, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
+ """ Read an file from disk. Supportted types include:
+ CSV, Matrix Market(coordinate), Text(i,j,v), SystemDS Binay
+ See: http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions for more details
+ :return: an Operation Node, containing the read data.
+ """
+ return OperationNode(self, 'read', [f'"{path}"'], named_input_nodes=kwargs, shape=(-1,))
- def scalar(self, v: Dict[str, VALID_INPUT_TYPES]):
- return OperationNode(self, v, output_type=OutputType.SCALAR)
\ No newline at end of file
+ def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
+ """ Construct an scalar value, this can contain str, float, double, integers and booleans.
+ :return: An `OperationNode` containing the scalar value.
+ """
+ if type(v) is str:
+ if not ((v[0] == '"' and v[-1] == '"') or (v[0] == "'" and v[-1] == "'")):
+ v = f'"{v}"'
+ # output type assign simply assigns the given variable to the value
+ # therefore the output type is assign.
+ return OperationNode(self, v, output_type=OutputType.ASSIGN)
diff --git a/src/main/python/systemds/operator/algorithm.py b/src/main/python/systemds/operator/algorithm.py
index 2af261f..b29caf1 100644
--- a/src/main/python/systemds/operator/algorithm.py
+++ b/src/main/python/systemds/operator/algorithm.py
@@ -156,10 +156,14 @@
if y.shape[0] == 0:
raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
.format(s=y.shape))
-
+ if -1 in x.shape:
+ output_shape = (-1,)
+ else:
+ output_shape = (x.shape[1],)
+
params_dict = {'X': x, 'Y': y}
params_dict.update(kwargs)
- return OperationNode(x.sds_context, 'multiLogReg', named_input_nodes=params_dict, shape = (x.shape[1],))
+ return OperationNode(x.sds_context, 'multiLogReg', named_input_nodes=params_dict, shape = output_shape)
def multiLogRegPredict(x: OperationNode, b: OperationNode, y: OperationNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index ebe5804..4d01a8b 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -161,7 +161,7 @@
return f'{output}={self.operation}({inputs_comma_sep});'
elif self.output_type == OutputType.NONE:
return f'{self.operation}({inputs_comma_sep});'
- elif self.output_type == OutputType.SCALAR:
+ elif self.output_type == OutputType.ASSIGN:
return f'{var_name}={self.operation};'
else:
return f'{var_name}={self.operation}({inputs_comma_sep});'
@@ -341,15 +341,30 @@
return OperationNode(self.sds_context, 'moment', unnamed_inputs, output_type=OutputType.DOUBLE)
def write(self, destination: str, format:str = "binary", **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
+ """ Write input to disk.
+ The written format is easily read by SystemDSContext.read().
+ There is no return on write.
+
+ :param destination: The location which the file is stored. Defaulting to HDFS paths if available.
+ :param format: The format which the file is saved in. Default is binary to improve SystemDS reading times.
+ :param kwargs: Contains multiple extra specific arguments, can be seen at http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions
+ """
unnamed_inputs = [self, f'"{destination}"']
named_parameters = {"format":f'"{format}"'}
named_parameters.update(kwargs)
return OperationNode(self.sds_context, 'write', unnamed_inputs, named_parameters, output_type= OutputType.NONE)
def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
- return OperationNode(self.sds_context, 'toString', [self], kwargs, output_type= OutputType.DOUBLE)
+ """ Converts the input to a string representation.
+ :return: `OperationNode` containing the string.
+ """
+ return OperationNode(self.sds_context, 'toString', [self], kwargs, output_type= OutputType.SCALAR)
def print(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
+ """ Prints the given Operation Node.
+ There is no return on calling.
+ To get the returned string look at the stdout of SystemDSContext.
+ """
return OperationNode(self.sds_context, 'print', [self], kwargs, output_type= OutputType.NONE)
def rev(self) -> 'OperationNode':
diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py
index 9e027e6..fac8d17 100644
--- a/src/main/python/systemds/script_building/dag.py
+++ b/src/main/python/systemds/script_building/dag.py
@@ -34,6 +34,7 @@
MATRIX = auto()
DOUBLE = auto()
SCALAR = auto()
+ ASSIGN = auto()
LIST = auto()
NONE = auto()
diff --git a/src/main/python/tests/examples/tutorials/test_mnist.py b/src/main/python/tests/examples/tutorials/test_mnist.py
index c4e9258..9843f4d 100644
--- a/src/main/python/tests/examples/tutorials/test_mnist.py
+++ b/src/main/python/tests/examples/tutorials/test_mnist.py
@@ -36,6 +36,7 @@
sds: SystemDSContext = None
d: DataManager = None
+ base_path = "systemds/examples/tutorials/mnist/"
@classmethod
def setUpClass(cls):
@@ -84,6 +85,29 @@
self.assertGreater(acc, 80)
+ def test_multi_log_reg_with_read(self):
+ train_count = 100
+ test_count = 100
+ X = Matrix(self.sds, self.d.get_train_data().reshape(
+ (60000, 28*28))[:train_count])
+ X.write(self.base_path + "train_data").compute()
+ Y = Matrix(self.sds, self.d.get_train_labels()[:train_count]) + 1
+ Y.write(self.base_path + "train_labels").compute()
+
+ Xr = self.sds.read(self.base_path + "train_data")
+ Yr = self.sds.read(self.base_path + "train_labels")
+
+ bias = multiLogReg(Xr, Yr, verbose=False)
+ # Test data
+ Xt = Matrix(self.sds, self.d.get_test_data().reshape(
+ (10000, 28*28))[:test_count])
+ Yt = Matrix(self.sds, self.d.get_test_labels()[:test_count])
+ Yt = Yt + 1.0
+
+ [_, _, acc] = multiLogRegPredict(Xt, bias, Yt).compute(verbose=True)
+
+ self.assertGreater(acc, 70)
+
if __name__ == "__main__":
unittest.main(exit=False)