[SYSTEMDS-2645] Python API K-Means algorithm

This commit introduce the K Means algorithm to the python API.
Currently the python interface only allows to return one value not a
list, therefore only the clusters are returned from the call to kmeans.
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index 757f451..16e99c8 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -95,9 +95,9 @@
         self.__stdout = Queue()
         self.__stderr = Queue()
 
-        Thread(target= self.__enqueue_output, args=(
+        Thread(target=self.__enqueue_output, args=(
             process.stdout, self.__stdout), daemon=True).start()
-        Thread(target= self.__enqueue_output, args=(
+        Thread(target=self.__enqueue_output, args=(
             process.stderr, self.__stderr), daemon=True).start()
 
         # Py4j connect to the started process.
@@ -106,22 +106,24 @@
         self.java_gateway = JavaGateway(
             gateway_parameters=gateway_parameters, java_process=process)
 
-    def get_stdout(self, lines: int = 1):
+    def get_stdout(self, lines: int = -1):
         """Getter for the stdout of the java subprocess
         The output is taken from the stdout queue and returned in a new list.
-        :param lines: The number of lines to try to read from the stdout queue
+        :param lines: The number of lines to try to read from the stdout queue.
+        default -1 prints all current lines in the queue.
         """
-        if self.__stdout.qsize() < lines:
+        if lines == -1 or self.__stdout.qsize() < lines:
             return [self.__stdout.get() for x in range(self.__stdout.qsize())]
         else:
             return [self.__stdout.get() for x in range(lines)]
 
-    def get_stderr(self, lines: int = 1):
+    def get_stderr(self, lines: int = -1):
         """Getter for the stderr of the java subprocess
         The output is taken from the stderr queue and returned in a new list.
-        :param lines: The number of lines to try to read from the stderr queue
+        :param lines: The number of lines to try to read from the stderr queue.
+        default -1 prints all current lines in the queue.
         """
-        if self.__stderr.qsize() < lines:
+        if lines == -1 or self.__stderr.qsize() < lines:
             return [self.__stderr.get() for x in range(self.__stderr.qsize())]
         else:
             return [self.__stderr.get() for x in range(lines)]
@@ -136,19 +138,18 @@
 
     def close(self):
         """Close the connection to the java process and do necessary cleanup."""
-        process : Popen = self.java_gateway.java_process
+        process: Popen = self.java_gateway.java_process
         self.java_gateway.shutdown()
         # Send SigTerm
         os.kill(process.pid, 14)
 
-    def __enqueue_output(self,out, queue):
+    def __enqueue_output(self, out, queue):
         """Method for handling the output from java.
         It is locating the string handeling inside a different thread, since the 'out.readline' is a blocking command.
         """
         for line in iter(out.readline, b""):
             queue.put(line.decode("utf-8").strip())
 
-
     def __get_open_port(self):
         """Get a random available port."""
         # TODO Verify that it is not taking some critical ports change to select a good port range.
diff --git a/src/main/python/systemds/operator/algorithm.py b/src/main/python/systemds/operator/algorithm.py
index a9014f8..be6c018 100644
--- a/src/main/python/systemds/operator/algorithm.py
+++ b/src/main/python/systemds/operator/algorithm.py
@@ -20,16 +20,18 @@
 # -------------------------------------------------------------
 
 from typing import Dict
+
+from systemds.operator import OperationNode
 from systemds.script_building.dag import DAGNode
 from systemds.utils.consts import VALID_INPUT_TYPES
-from systemds.operator import OperationNode
 
 __all__ = ['l2svm', 'lm']
 
+
 def l2svm(x: DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
     """
     Perform L2SVM on matrix with labels given.
-    
+
     :param x: Input dataset
     :param y: Input labels in shape of one column
     :param kwargs: Dictionary of extra arguments 
@@ -41,16 +43,16 @@
     return OperationNode(x.sds_context, 'l2svm', named_input_nodes=params_dict)
 
 
-def lm(x :DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
+def lm(x: DAGNode, y: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
     """
     Performs LM on matrix with labels given.
-    
+
     :param x: Input dataset
     :param y: Input labels in shape of one column
     :param kwargs: Dictionary of extra arguments 
     :return: `OperationNode` containing the model fit.
     """
-    
+
     x._check_matrix_op()
     if x._np_array.size == 0:
         raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
@@ -58,8 +60,35 @@
     if y._np_array.size == 0:
         raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
                          .format(s=y._np_array.shape))
-    
+
     params_dict = {'X': x, 'y': y}
     params_dict.update(kwargs)
     return OperationNode(x.sds_context, 'lm', named_input_nodes=params_dict)
 
+
+def kmeans(x: DAGNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode:
+    """
+    Perfoms KMeans on matrix input.
+
+    :param x: Input dataset to perform K-Means on.
+    :param k: The Number of centroids to use for the algorithm.
+    :param runs: The Number of concurrent instances of K-Means to run (with different initial centroids).
+    :param max_iter: The Maximum number of iterations to run the K-Means algorithm for.
+    :param eps: Tolerance for the algorithm to declare convergence using WCSS change ratio.
+    :param is_verbose: Boolean flag if the algorithm should be run in a verbose manner.
+    :param avg_sample_size_per_centroid: The Average Number of records per centroid in the data samples.
+    """
+
+    x._check_matrix_op()
+    if x._np_array.size == 0:
+        raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required."
+                         .format(s=x._np_array.shape))
+
+    if 'k' in kwargs.keys() and kwargs.get('k') < 1:
+        raise ValueError("Invalid number of clusters in K means, number must be integer above 0")
+
+
+
+    params_dict = {'X': x}
+    params_dict.update(kwargs)
+    return OperationNode(x.sds_context, 'kmeans', named_input_nodes=params_dict)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 76d06e7..fbe8d19 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -20,6 +20,7 @@
 #-------------------------------------------------------------
 
 from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING
+from multiprocessing import Process
 
 import numpy as np
 from py4j.java_gateway import JVMView, JavaObject
@@ -30,6 +31,8 @@
 from systemds.script_building.script import DMLScript
 from systemds.script_building.dag import OutputType, DAGNode
 
+
+
 if TYPE_CHECKING:
     # to avoid cyclic dependencies during runtime
     from systemds.context import SystemDSContext
@@ -70,6 +73,10 @@
 
     def compute(self, verbose: bool = False, lineage: bool = False) -> \
             Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+
+
+
         if self._result_var is None or self._lineage_trace is None:
             self._script = DMLScript(self.sds_context)
             self._script.build_code(self)
@@ -77,14 +84,21 @@
                 result_variables, self._lineage_trace = self._script.execute(lineage)
             else:
                 result_variables = self._script.execute(lineage)
+
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
             if self.output_type == OutputType.DOUBLE:
                 self._result_var = result_variables.getDouble(self._script.out_var_name)
             elif self.output_type == OutputType.MATRIX:
                 self._result_var = matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
                                                          result_variables.getMatrixBlock(self._script.out_var_name))
         if verbose:
-            print(self._script.dml_script)
-            # TODO further info
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
 
         if lineage:
             return self._result_var, self._lineage_trace
diff --git a/src/main/python/tests/algorithms/__init__.py b/src/main/python/tests/algorithms/__init__.py
new file mode 100644
index 0000000..e66abb4
--- /dev/null
+++ b/src/main/python/tests/algorithms/__init__.py
@@ -0,0 +1,20 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
diff --git a/src/main/python/tests/algorithms/test_kmeans.py b/src/main/python/tests/algorithms/test_kmeans.py
new file mode 100644
index 0000000..3eef08e
--- /dev/null
+++ b/src/main/python/tests/algorithms/test_kmeans.py
@@ -0,0 +1,83 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+from systemds.matrix import Matrix
+from systemds.operator.algorithm import kmeans
+
+
+class TestKMeans(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_500x2(self):
+        """
+        This test is based on statistics, that if we run kmeans, on a normal distributed dataset, centered around 0
+        and use 4 clusters then they will be located in each one corner.
+        """
+        features = self.generate_matrices_for_k_means((500, 2), seed=1304)
+        res = kmeans(features, k=4).compute()
+
+        corners = set()
+        for x in res:
+            if x[0] > 0 and x[1] > 0:
+                corners.add("pp")
+            elif x[0] > 0 and x[1] < 0:
+                corners.add("pn")
+            elif x[0] < 0 and x[1] > 0:
+                corners.add("np")
+            else:
+                corners.add("nn")
+        self.assertTrue(len(corners) == 4)
+
+    def test_invalid_input_1(self):
+        features = Matrix(self.sds, np.array([]))
+        with self.assertRaises(ValueError) as context:
+            kmeans(features)
+
+    def test_invalid_input_2(self):
+        features = Matrix(self.sds, np.array([1]))
+        with self.assertRaises(ValueError) as context:
+            kmeans(features, k=-1)
+
+    def generate_matrices_for_k_means(self, dims: (int, int), seed: int = 1234):
+        np.random.seed(seed)
+        mu, sigma = 0, 0.1
+        s = np.random.normal(mu, sigma,  dims[0] * dims[1])
+        m1 = np.array(s, dtype=np.double)
+        m1 = np.reshape(m1, (dims[0], dims[1]))
+
+        return Matrix(self.sds, m1)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/test_l2svm.py b/src/main/python/tests/algorithms/test_l2svm.py
similarity index 100%
rename from src/main/python/tests/test_l2svm.py
rename to src/main/python/tests/algorithms/test_l2svm.py
diff --git a/src/main/python/tests/test_lm.py b/src/main/python/tests/algorithms/test_lm.py
similarity index 100%
rename from src/main/python/tests/test_lm.py
rename to src/main/python/tests/algorithms/test_lm.py
diff --git a/src/main/python/tests/onnx_systemds/__init__.py b/src/main/python/tests/onnx_systemds/__init__.py
index fe95886..e66abb4 100644
--- a/src/main/python/tests/onnx_systemds/__init__.py
+++ b/src/main/python/tests/onnx_systemds/__init__.py
@@ -1,3 +1,4 @@
+# -------------------------------------------------------------
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -16,3 +17,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+# -------------------------------------------------------------
diff --git a/src/main/python/tests/onnx_systemds/test_simple.py b/src/main/python/tests/onnx_systemds/test_simple.py
index 7963b77..d1bd17a 100644
--- a/src/main/python/tests/onnx_systemds/test_simple.py
+++ b/src/main/python/tests/onnx_systemds/test_simple.py
@@ -1,17 +1,23 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
+# -------------------------------------------------------------
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
 
 import unittest
 import tests.onnx_systemds.util as util
diff --git a/src/main/python/tests/onnx_systemds/util.py b/src/main/python/tests/onnx_systemds/util.py
index 60376d9..ff6cc48 100644
--- a/src/main/python/tests/onnx_systemds/util.py
+++ b/src/main/python/tests/onnx_systemds/util.py
@@ -1,17 +1,24 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
+# -------------------------------------------------------------
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
 
 import os
 import subprocess