MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi (#766)
MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi (#766)
diff --git a/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py b/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
new file mode 100644
index 0000000..0679018
--- /dev/null
+++ b/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+ Install the following with pip
+
+ -- after downloading the mojo scoring pipeline from Driverless AI,
+ the following packages were needed for helping to make predictions
+
+ pip install datatable pandas scipy
+
+ -- after downloading the mojo2 py runtime from Driverless AI depending on
+ your OS, you need to install the appropriate package:
+
+ # Install the MOJO2 Py runtime on Mac OS X
+ pip install path/to/daimojo-2.2.0-cp36-cp36m-macosx_10_7_x86_64.whl
+
+ # Install the MOJO2 Py runtime on Linux x86
+ pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_x86_64.whl
+
+ # Install the MOJO2 Py runtime on Linux PPC
+ pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_ppc64le.whl
+"""
+import codecs
+import pandas as pd
+import datatable as dt
+from collections import Counter
+from scipy.special._ufuncs import expit
+import daimojo.model
+
+def describe(processor):
+ """ describe what this processor does
+ """
+ processor.setDescription("Executes H2O's MOJO Scoring Pipeline in C++ Runtime Python Wrapper \
+ to do batch scoring or real time scoring for one or more predicted label(s) on the tabular \
+ test data in the incoming flow file content. If tabular data is one row, then MOJO does \
+ real time scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
+
+def onInitialize(processor):
+ """ onInitialize is where you can set properties
+ """
+ processor.addProperty("MOJO Pipeline Filepath", "Add the filepath to the MOJO pipeline file. For example, \
+ 'path/to/mojo-pipeline/pipeline.mojo'.", "", True, False)
+
+class ContentExtract(object):
+ """ ContentExtract callback class is defined for reading streams of data through the session
+ and has a process function that accepts the input stream
+ """
+ def __init__(self):
+ self.content = None
+
+ def process(self, input_stream):
+ """ Use codecs getReader to read that data
+ """
+ self.content = codecs.getreader('utf-8')(input_stream).read()
+ return len(self.content)
+
+class ContentWrite(object):
+ """ ContentWrite callback class is defined for writing streams of data through the session
+ """
+ def __init__(self, data):
+ self.content = data
+
+ def process(self, output_stream):
+ """ Use codecs getWriter to write data encoded to the stream
+ """
+ codecs.getwriter('utf-8')(output_stream).write(self.content)
+ return len(self.content)
+
+def onTrigger(context, session):
+ """ onTrigger is executed and passed processor context and session
+ """
+ flow_file = session.get()
+ # lambda compares two lists: does header equal expected header
+ compare = lambda header, exp_header: Counter(header) == Counter(exp_header)
+ if flow_file is not None:
+ # read test data of flow file content into read_cb.content
+ read_cb = ContentExtract()
+ session.read(flow_file, read_cb)
+ # instantiate H2O's MOJO Scoring Pipeline Scorer
+ mojo_pipeline_filepath = context.getProperty("MOJO Pipeline Filepath")
+ m_scorer = daimojo.model(mojo_pipeline_filepath)
+ # add flow file attribute for creation time of mojo
+ flow_file.addAttribute("mojo_creation_time", m_scorer.created_time)
+ # add flow file attribute for uuid of mojo
+ flow_file.addAttribute("mojo_uuid", m_scorer.uuid)
+ # get list of predicted label(s) for prediction header
+ pred_header = m_scorer.output_names
+ # load tabular data str of 1 or more rows into datatable frame
+ test_dt_frame = dt.Frame(read_cb.content)
+ # does test dt frame column names (header) equal m_scorer feature_names (exp_header)
+ if compare(test_dt_frame.names, m_scorer.feature_names) == False:
+ test_dt_frame.names = tuple(m_scorer.feature_names)
+ # do scoring on test data in the test_dt_frame, return dt frame with predicted label(s)
+ preds_dt_frame = m_scorer.predict(test_dt_frame)
+ # convert preds_dt_frame to pandas dataframe
+ preds_df = preds_dt_frame.to_pandas()
+ # convert pandas df to str without df index, then write to flow file
+ preds_df_str = preds_df.to_string(index=False)
+ write_cb = ContentWrite(preds_df_str)
+ session.write(flow_file, write_cb)
+ # add flow file attribute: number of rows to know how many rows were scored
+ flow_file.addAttribute("num_rows_scored", str(preds_dt_frame.nrows))
+ # add one or more flow file attributes: predicted label name and associated score pair
+ for i in range(len(pred_header)):
+ ff_attr_name = pred_header[i] + "_pred_0"
+ flow_file.addAttribute(ff_attr_name, str(preds_df.at[0,pred_header[i]]))
+ log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
+ session.transfer(flow_file, REL_SUCCESS)
\ No newline at end of file