[SYSTEMDS-2645] Python API K-Means algorithm
This commit introduce the K Means algorithm to the python API.
Currently the python interface only allows to return one value not a
list, therefore only the clusters are returned from the call to kmeans.
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index 757f451..16e99c8 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -95,9 +95,9 @@
self.__stdout = Queue()
self.__stderr = Queue()
- Thread(target= self.__enqueue_output, args=(
+ Thread(target=self.__enqueue_output, args=(
process.stdout, self.__stdout), daemon=True).start()
- Thread(target= self.__enqueue_output, args=(
+ Thread(target=self.__enqueue_output, args=(
process.stderr, self.__stderr), daemon=True).start()
# Py4j connect to the started process.
@@ -106,22 +106,24 @@
self.java_gateway = JavaGateway(
gateway_parameters=gateway_parameters, java_process=process)
- def get_stdout(self, lines: int = 1):
+ def get_stdout(self, lines: int = -1):
"""Getter for the stdout of the java subprocess
The output is taken from the stdout queue and returned in a new list.
- :param lines: The number of lines to try to read from the stdout queue
+ :param lines: The number of lines to try to read from the stdout queue.
+ default -1 prints all current lines in the queue.
"""
- if self.__stdout.qsize() < lines:
+ if lines == -1 or self.__stdout.qsize() < lines:
return [self.__stdout.get() for x in range(self.__stdout.qsize())]
else:
return [self.__stdout.get() for x in range(lines)]
- def get_stderr(self, lines: int = 1):
+ def get_stderr(self, lines: int = -1):
"""Getter for the stderr of the java subprocess
The output is taken from the stderr queue and returned in a new list.
- :param lines: The number of lines to try to read from the stderr queue
+ :param lines: The number of lines to try to read from the stderr queue.
+ default -1 prints all current lines in the queue.
"""
- if self.__stderr.qsize() < lines:
+ if lines == -1 or self.__stderr.qsize() < lines:
return [self.__stderr.get() for x in range(self.__stderr.qsize())]
else:
return [self.__stderr.get() for x in range(lines)]
@@ -136,19 +138,18 @@
def close(self):
"""Close the connection to the java process and do necessary cleanup."""
- process : Popen = self.java_gateway.java_process
+ process: Popen = self.java_gateway.java_process
self.java_gateway.shutdown()
# Send SigTerm
os.kill(process.pid, 14)
- def __enqueue_output(self,out, queue):
+ def __enqueue_output(self, out, queue):
"""Method for handling the output from java.
It is locating the string handeling inside a different thread, since the 'out.readline' is a blocking command.
"""
for line in iter(out.readline, b""):
queue.put(line.decode("utf-8").strip())
-
def __get_open_port(self):
"""Get a random available port."""
# TODO Verify that it is not taking some critical ports change to select a good port range.
diff --git a/src/main/python/systemds/operator/algorithm.py b/src/main/python/systemds/operator/algorithm.py
index a9014f8..be6c018 100644
--- a/src/main/python/systemds/operator/algorithm.py
+++ b/src/main/python/systemds/operator/algorithm.py
@@ -20,16 +20,18 @@
# -------------------------------------------------------------
from typing import Dict
+
+from systemds.operator import OperationNode
from systemds.script_building.dag import DAGNode
from systemds.utils.consts import VALID_INPUT_TYPES
-from systemds.operator import OperationNode
__all__ = ['l2svm', 'lm']
+
def l2svm(x: DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
"""
Perform L2SVM on matrix with labels given.
-
+
:param x: Input dataset
:param y: Input labels in shape of one column
:param kwargs: Dictionary of extra arguments
@@ -41,16 +43,16 @@
return OperationNode(x.sds_context, 'l2svm', named_input_nodes=params_dict)
-def lm(x :DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
+def lm(x: DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
"""
Performs LM on matrix with labels given.
-
+
:param x: Input dataset
:param y: Input labels in shape of one column
:param kwargs: Dictionary of extra arguments
:return: `OperationNode` containing the model fit.
"""
-
+
x._check_matrix_op()
if x._np_array.size == 0:
raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
@@ -58,8 +60,35 @@
if y._np_array.size == 0:
raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
.format(s=y._np_array.shape))
-
+
params_dict = {'X': x, 'y': y}
params_dict.update(kwargs)
return OperationNode(x.sds_context, 'lm', named_input_nodes=params_dict)
+
+def kmeans(x: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
+ """
+ Perfoms KMeans on matrix input.
+
+ :param x: Input dataset to perform K-Means on.
+ :param k: The Number of centroids to use for the algorithm.
+ :param runs: The Number of concurrent instances of K-Means to run (with different initial centroids).
+ :param max_iter: The Maximum number of iterations to run the K-Means algorithm for.
+ :param eps: Tolerance for the algorithm to declare convergence using WCSS change ratio.
+ :param is_verbose: Boolean flag if the algorithm should be run in a verbose manner.
+ :param avg_sample_size_per_centroid: The Average Number of records per centroid in the data samples.
+ """
+
+ x._check_matrix_op()
+ if x._np_array.size == 0:
+ raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
+ .format(s=x._np_array.shape))
+
+ if 'k' in kwargs.keys() and kwargs.get('k') < 1:
+ raise ValueError("Invalid number of clusters in K means, number must be integer above 0")
+
+
+
+ params_dict = {'X': x}
+ params_dict.update(kwargs)
+ return OperationNode(x.sds_context, 'kmeans', named_input_nodes=params_dict)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 76d06e7..fbe8d19 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -20,6 +20,7 @@
#-------------------------------------------------------------
from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING
+from multiprocessing import Process
import numpy as np
from py4j.java_gateway import JVMView, JavaObject
@@ -30,6 +31,8 @@
from systemds.script_building.script import DMLScript
from systemds.script_building.dag import OutputType, DAGNode
+
+
if TYPE_CHECKING:
# to avoid cyclic dependencies during runtime
from systemds.context import SystemDSContext
@@ -70,6 +73,10 @@
def compute(self, verbose: bool = False, lineage: bool = False) -> \
Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+
+
+
if self._result_var is None or self._lineage_trace is None:
self._script = DMLScript(self.sds_context)
self._script.build_code(self)
@@ -77,14 +84,21 @@
result_variables, self._lineage_trace = self._script.execute(lineage)
else:
result_variables = self._script.execute(lineage)
+
+ if verbose:
+ print("SCRIPT:")
+ print(self._script.dml_script)
+
if self.output_type == OutputType.DOUBLE:
self._result_var = result_variables.getDouble(self._script.out_var_name)
elif self.output_type == OutputType.MATRIX:
self._result_var = matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
result_variables.getMatrixBlock(self._script.out_var_name))
if verbose:
- print(self._script.dml_script)
- # TODO further info
+ for x in self.sds_context.get_stdout():
+ print(x)
+ for y in self.sds_context.get_stderr():
+ print(y)
if lineage:
return self._result_var, self._lineage_trace
diff --git a/src/main/python/tests/algorithms/__init__.py b/src/main/python/tests/algorithms/__init__.py
new file mode 100644
index 0000000..e66abb4
--- /dev/null
+++ b/src/main/python/tests/algorithms/__init__.py
@@ -0,0 +1,20 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
diff --git a/src/main/python/tests/algorithms/test_kmeans.py b/src/main/python/tests/algorithms/test_kmeans.py
new file mode 100644
index 0000000..3eef08e
--- /dev/null
+++ b/src/main/python/tests/algorithms/test_kmeans.py
@@ -0,0 +1,83 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+from systemds.matrix import Matrix
+from systemds.operator.algorithm import kmeans
+
+
+class TestKMeans(unittest.TestCase):
+
+ sds: SystemDSContext = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.sds = SystemDSContext()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sds.close()
+
+ def test_500x2(self):
+ """
+ This test is based on statistics, that if we run kmeans, on a normal distributed dataset, centered around 0
+ and use 4 clusters then they will be located in each one corner.
+ """
+ features = self.generate_matrices_for_k_means((500, 2), seed=1304)
+ res = kmeans(features, k=4).compute()
+
+ corners = set()
+ for x in res:
+ if x[0] > 0 and x[1] > 0:
+ corners.add("pp")
+ elif x[0] > 0 and x[1] < 0:
+ corners.add("pn")
+ elif x[0] < 0 and x[1] > 0:
+ corners.add("np")
+ else:
+ corners.add("nn")
+ self.assertTrue(len(corners) == 4)
+
+ def test_invalid_input_1(self):
+ features = Matrix(self.sds, np.array([]))
+ with self.assertRaises(ValueError) as context:
+ kmeans(features)
+
+ def test_invalid_input_2(self):
+ features = Matrix(self.sds, np.array([1]))
+ with self.assertRaises(ValueError) as context:
+ kmeans(features, k=-1)
+
+ def generate_matrices_for_k_means(self, dims: (int, int), seed: int = 1234):
+ np.random.seed(seed)
+ mu, sigma = 0, 0.1
+ s = np.random.normal(mu, sigma, dims[0] * dims[1])
+ m1 = np.array(s, dtype=np.double)
+ m1 = np.reshape(m1, (dims[0], dims[1]))
+
+ return Matrix(self.sds, m1)
+
+
+if __name__ == "__main__":
+ unittest.main(exit=False)
diff --git a/src/main/python/tests/test_l2svm.py b/src/main/python/tests/algorithms/test_l2svm.py
similarity index 100%
rename from src/main/python/tests/test_l2svm.py
rename to src/main/python/tests/algorithms/test_l2svm.py
diff --git a/src/main/python/tests/test_lm.py b/src/main/python/tests/algorithms/test_lm.py
similarity index 100%
rename from src/main/python/tests/test_lm.py
rename to src/main/python/tests/algorithms/test_lm.py
diff --git a/src/main/python/tests/onnx_systemds/__init__.py b/src/main/python/tests/onnx_systemds/__init__.py
index fe95886..e66abb4 100644
--- a/src/main/python/tests/onnx_systemds/__init__.py
+++ b/src/main/python/tests/onnx_systemds/__init__.py
@@ -1,3 +1,4 @@
+# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -16,3 +17,4 @@
# specific language governing permissions and limitations
# under the License.
#
+# -------------------------------------------------------------
diff --git a/src/main/python/tests/onnx_systemds/test_simple.py b/src/main/python/tests/onnx_systemds/test_simple.py
index 7963b77..d1bd17a 100644
--- a/src/main/python/tests/onnx_systemds/test_simple.py
+++ b/src/main/python/tests/onnx_systemds/test_simple.py
@@ -1,17 +1,23 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
+# -------------------------------------------------------------
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
import unittest
import tests.onnx_systemds.util as util
diff --git a/src/main/python/tests/onnx_systemds/util.py b/src/main/python/tests/onnx_systemds/util.py
index 60376d9..ff6cc48 100644
--- a/src/main/python/tests/onnx_systemds/util.py
+++ b/src/main/python/tests/onnx_systemds/util.py
@@ -1,17 +1,24 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
+# -------------------------------------------------------------
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
import os
import subprocess