blob: 0679018e987a0937c8c633331ddf65d6927e5296 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Install the following with pip
-- after downloading the mojo scoring pipeline from Driverless AI,
the following packages were needed for helping to make predictions
pip install datatable pandas scipy
-- after downloading the mojo2 py runtime from Driverless AI depending on
your OS, you need to install the appropriate package:
# Install the MOJO2 Py runtime on Mac OS X
pip install path/to/daimojo-2.2.0-cp36-cp36m-macosx_10_7_x86_64.whl
# Install the MOJO2 Py runtime on Linux x86
pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_x86_64.whl
# Install the MOJO2 Py runtime on Linux PPC
pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_ppc64le.whl
"""
import codecs
import pandas as pd
import datatable as dt
from collections import Counter
from scipy.special._ufuncs import expit
import daimojo.model
def describe(processor):
""" describe what this processor does
"""
processor.setDescription("Executes H2O's MOJO Scoring Pipeline in C++ Runtime Python Wrapper \
to do batch scoring or real time scoring for one or more predicted label(s) on the tabular \
test data in the incoming flow file content. If tabular data is one row, then MOJO does \
real time scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
def onInitialize(processor):
""" onInitialize is where you can set properties
"""
processor.addProperty("MOJO Pipeline Filepath", "Add the filepath to the MOJO pipeline file. For example, \
'path/to/mojo-pipeline/pipeline.mojo'.", "", True, False)
class ContentExtract(object):
""" ContentExtract callback class is defined for reading streams of data through the session
and has a process function that accepts the input stream
"""
def __init__(self):
self.content = None
def process(self, input_stream):
""" Use codecs getReader to read that data
"""
self.content = codecs.getreader('utf-8')(input_stream).read()
return len(self.content)
class ContentWrite(object):
""" ContentWrite callback class is defined for writing streams of data through the session
"""
def __init__(self, data):
self.content = data
def process(self, output_stream):
""" Use codecs getWriter to write data encoded to the stream
"""
codecs.getwriter('utf-8')(output_stream).write(self.content)
return len(self.content)
def onTrigger(context, session):
""" onTrigger is executed and passed processor context and session
"""
flow_file = session.get()
# lambda compares two lists: does header equal expected header
compare = lambda header, exp_header: Counter(header) == Counter(exp_header)
if flow_file is not None:
# read test data of flow file content into read_cb.content
read_cb = ContentExtract()
session.read(flow_file, read_cb)
# instantiate H2O's MOJO Scoring Pipeline Scorer
mojo_pipeline_filepath = context.getProperty("MOJO Pipeline Filepath")
m_scorer = daimojo.model(mojo_pipeline_filepath)
# add flow file attribute for creation time of mojo
flow_file.addAttribute("mojo_creation_time", m_scorer.created_time)
# add flow file attribute for uuid of mojo
flow_file.addAttribute("mojo_uuid", m_scorer.uuid)
# get list of predicted label(s) for prediction header
pred_header = m_scorer.output_names
# load tabular data str of 1 or more rows into datatable frame
test_dt_frame = dt.Frame(read_cb.content)
# does test dt frame column names (header) equal m_scorer feature_names (exp_header)
if compare(test_dt_frame.names, m_scorer.feature_names) == False:
test_dt_frame.names = tuple(m_scorer.feature_names)
# do scoring on test data in the test_dt_frame, return dt frame with predicted label(s)
preds_dt_frame = m_scorer.predict(test_dt_frame)
# convert preds_dt_frame to pandas dataframe
preds_df = preds_dt_frame.to_pandas()
# convert pandas df to str without df index, then write to flow file
preds_df_str = preds_df.to_string(index=False)
write_cb = ContentWrite(preds_df_str)
session.write(flow_file, write_cb)
# add flow file attribute: number of rows to know how many rows were scored
flow_file.addAttribute("num_rows_scored", str(preds_dt_frame.nrows))
# add one or more flow file attributes: predicted label name and associated score pair
for i in range(len(pred_header)):
ff_attr_name = pred_header[i] + "_pred_0"
flow_file.addAttribute(ff_attr_name, str(preds_df.at[0,pred_header[i]]))
log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
session.transfer(flow_file, REL_SUCCESS)