MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi (#766)

MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi (#766)
diff --git a/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py b/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
new file mode 100644
index 0000000..0679018
--- /dev/null
+++ b/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+    Install the following with pip
+
+    -- after downloading the mojo scoring pipeline from Driverless AI,
+       the following packages were needed for helping to make predictions
+
+    pip install datatable pandas scipy
+
+    -- after downloading the mojo2 py runtime from Driverless AI depending on
+       your OS, you need to install the appropriate package:
+
+    # Install the MOJO2 Py runtime on Mac OS X
+    pip install path/to/daimojo-2.2.0-cp36-cp36m-macosx_10_7_x86_64.whl
+    
+    # Install the MOJO2 Py runtime on Linux x86
+    pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_x86_64.whl
+    
+    # Install the MOJO2 Py runtime on Linux PPC
+    pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_ppc64le.whl
+"""
+import codecs
+import pandas as pd
+import datatable as dt
+from collections import Counter
+from scipy.special._ufuncs import expit
+import daimojo.model
+
+def describe(processor):
+    """ describe what this processor does
+    """
+    processor.setDescription("Executes H2O's MOJO Scoring Pipeline in C++ Runtime Python Wrapper \
+        to do batch scoring or real time scoring for one or more predicted label(s) on the tabular \
+        test data in the incoming flow file content. If tabular data is one row, then MOJO does \
+        real time scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
+
+def onInitialize(processor):
+    """ onInitialize is where you can set properties
+    """
+    processor.addProperty("MOJO Pipeline Filepath", "Add the filepath to the MOJO pipeline file. For example, \
+        'path/to/mojo-pipeline/pipeline.mojo'.", "", True, False)
+
+class ContentExtract(object):
+    """ ContentExtract callback class is defined for reading streams of data through the session
+        and has a process function that accepts the input stream
+    """
+    def __init__(self):
+        self.content = None
+    
+    def process(self, input_stream):
+        """ Use codecs getReader to read that data
+        """
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
+
+class ContentWrite(object):
+    """ ContentWrite callback class is defined for writing streams of data through the session
+    """
+    def __init__(self, data):
+        self.content = data
+
+    def process(self, output_stream):
+        """ Use codecs getWriter to write data encoded to the stream
+        """
+        codecs.getwriter('utf-8')(output_stream).write(self.content)
+        return len(self.content)
+
+def onTrigger(context, session):
+    """ onTrigger is executed and passed processor context and session
+    """
+    flow_file = session.get()
+    # lambda compares two lists: does header equal expected header
+    compare = lambda header, exp_header: Counter(header) == Counter(exp_header)
+    if flow_file is not None:
+        # read test data of flow file content into read_cb.content
+        read_cb = ContentExtract()
+        session.read(flow_file, read_cb)
+        # instantiate H2O's MOJO Scoring Pipeline Scorer
+        mojo_pipeline_filepath = context.getProperty("MOJO Pipeline Filepath")
+        m_scorer = daimojo.model(mojo_pipeline_filepath)
+        # add flow file attribute for creation time of mojo
+        flow_file.addAttribute("mojo_creation_time", m_scorer.created_time)
+        # add flow file attribute for uuid of mojo
+        flow_file.addAttribute("mojo_uuid", m_scorer.uuid)
+        # get list of predicted label(s) for prediction header
+        pred_header = m_scorer.output_names
+        # load tabular data str of 1 or more rows into datatable frame
+        test_dt_frame = dt.Frame(read_cb.content)
+        # does test dt frame column names (header) equal m_scorer feature_names (exp_header)
+        if compare(test_dt_frame.names, m_scorer.feature_names) == False:
+            test_dt_frame.names = tuple(m_scorer.feature_names)
+        # do scoring on test data in the test_dt_frame, return dt frame with predicted label(s)
+        preds_dt_frame = m_scorer.predict(test_dt_frame)
+        # convert preds_dt_frame to pandas dataframe
+        preds_df = preds_dt_frame.to_pandas()
+        # convert pandas df to str without df index, then write to flow file
+        preds_df_str = preds_df.to_string(index=False)
+        write_cb = ContentWrite(preds_df_str)
+        session.write(flow_file, write_cb)
+        # add flow file attribute: number of rows to know how many rows were scored
+        flow_file.addAttribute("num_rows_scored", str(preds_dt_frame.nrows))
+        # add one or more flow file attributes: predicted label name and associated score pair
+        for i in range(len(pred_header)):
+            ff_attr_name = pred_header[i] + "_pred_0"
+            flow_file.addAttribute(ff_attr_name, str(preds_df.at[0,pred_header[i]]))
+            log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
+        session.transfer(flow_file, REL_SUCCESS)
\ No newline at end of file