blob: 566f4e31260d215835e3d9639dcd2838eabc548e [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Install the following with pip
pip install datatable
pip install pandas
"""
import codecs
import pandas as pd # noqa F401
import datatable as dt
from io import StringIO
def describe(processor):
""" describe what this processor does
"""
processor.setDescription("Converts the data source content of incoming flow file to csv. It \
supports a variety of data sources: pandas DataFrames, csv, numpy \
arrays, dictionary, list, raw Python objects, etc")
def onInitialize(processor):
""" onInitialize is where you can set properties
"""
processor.setSupportsDynamicProperties()
class ContentExtract(object):
""" ContentExtract callback class is defined for reading streams of data through the session
and has a process function that accepts the input stream
"""
def __init__(self):
self.content = None
def process(self, input_stream):
""" Use codecs getReader to read that data
"""
self.content = codecs.getreader('utf-8')(input_stream).read()
return len(self.content)
class ContentWrite(object):
""" ContentWrite callback class is defined for writing streams of data through the session
"""
def __init__(self, data):
self.content = data
def process(self, output_stream):
""" Use codecs getWriter to write data encoded to the stream
"""
codecs.getwriter('utf-8')(output_stream).write(self.content)
return len(self.content)
def onTrigger(context, session):
""" onTrigger is executed and passed processor context and session
"""
flow_file = session.get()
if flow_file is not None:
read_cb = ContentExtract()
# read flow file content into read_cb.content data member
session.read(flow_file, read_cb)
# create empty in-memory text streams csv_data buffer
csv_data = StringIO()
# load str data into datatable, then convert to pandas df
dt_frame = dt.Frame(read_cb.content)
pd_dframe = dt_frame.to_pandas()
# convert df to csv file like object without df index
pd_dframe.to_csv(csv_data, index=False)
# set the csv to the start of text stream
csv_data.seek(0)
# get csv str data out of StringIO text stream, then store str
csv_data = csv_data.read()
# write csv str to flow file
write_cb = ContentWrite(csv_data)
session.write(flow_file, write_cb)
session.transfer(flow_file, REL_SUCCESS)