Merge pull request #126 from castleguarders/patch-3

Fix broken ingest_summary generation
diff --git a/DISCLAIMER b/DISCLAIMER
new file mode 100644
index 0000000..907de92
--- /dev/null
+++ b/DISCLAIMER
@@ -0,0 +1,11 @@
+DISCLAIMER
+
+Apache SPOT (incubating) is an effort undergoing incubation at the Apache
+Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+Incubation is required of all newly accepted projects until a further review
+indicates that the infrastructure, communications, and decision making process
+have stabilized in a manner consistent with other successful ASF projects.
+
+While incubation status is not necessarily a reflection of the completeness or
+stability of the code, it does indicate that theproject has yet to be fully
+endorsed by the ASF.
diff --git a/README.md b/README.md
index 13888ea..27a85d1 100644
--- a/README.md
+++ b/README.md
@@ -71,8 +71,8 @@
 
 Our Central repository for our Apache Spot solution is found here. If you find a bug, have question or something to discuss please contact us:
 
-* [Create an Issue](https://issues.apache.org/jira/browse/SPOT-20?jql=project%20%3D%20SPOT). 
-* [Go to our Slack channel](https://apachespot.slack.com/messages/general/). 
+* [Create an Issue](https://issues.apache.org/jira/browse/SPOT-20?jql=project%20%3D%20SPOT)
+* [Join the Dev List](mailto:issues-subscribe@spot.incubator.apache.org) and then [send us a message](mailto:dev@spot.incubator.apache.org)
 
 ## **Contributing to Apache Spot**
 
@@ -86,7 +86,7 @@
 * Fork the repo of the module that you wish to commit to.
 * Create a Branch, we use [topic branches](https://git-scm.com/book/en/v2/Git-Branching-Branching-Workflows#Topic-Branches) for our commits. 
 * Push your commit(s) to your repository.
-* Create a pull request to the original repo in Apache Spot organization.
+* Create a pull request to the original repo in Apache Spot organization. *(See Below for Merging details)*
 
 ### **Commit Guidelines**
 
@@ -94,10 +94,17 @@
 * Please be clear with the commit messages about what you are fixing or adding to the code base. If you code is addressing an open issue please add the reference to the issue in the comments with: Fix: Issue's URL. 
 
 
-### **Merge approval**
+### **Merge Process**
 
-Apache Spot maintainers use +1 in a comment on the code review to indicate acceptance, 
-at least 3 "+1" from maintainers are required to approve the merge. If you have any question or concern please feel free to add a comment in your pull request or branch and tag any of the maintainers.
+Thanks for considering to contribute to the Spot Project. In order to help make the process a little eaiser for everyone, please follow these steps.
+1) In order to start the merge process please open a ticket in the [Spot Jira](https://issues.apache.org/jira/projects/SPOT/issues) and take note of the Issue key *(SPOT-###)*. 
+2) Next, open a Pull-Request (PR) and reference the Issue key in the title of the PR. If you have any question or concern please feel free to add a comment in your pull request or branch and tag any of the maintainers.
+3) Now, it's time for the community to provide feedback on your commit. Getting community feedback can be hard, but start by sending a message to the Dev list; and make sure you're [subscribed](mailto:issues-subscribe@spot.incubator.apache.org) to the Dev List.
+4) At the same time, maintainers will be taking a look at your PR. The more community input you can get, in the form of comments rather than +1s, the more attention maintainers will give. 
+There are is a manual and an automatic merge process
+   * **Manual:** When a Project maintainer has given a '+1' in the comments, then you're PR has been accepted. However, it must be manually merged by a maintainer at this point. 
+   * **Automatic:** This process initiates when 3 maintainers provide a '+1'
+   * **Note:** if there are any merge conflicts you will have to come back and fix them before the process can continue.
 
 
 ## **Licensing**
diff --git a/issues-subscribe@spot.incubator.apache.org b/issues-subscribe@spot.incubator.apache.org
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/issues-subscribe@spot.incubator.apache.org
diff --git a/spot-ingest/README.md b/spot-ingest/README.md
index acfb382..ce4f4cc 100644
--- a/spot-ingest/README.md
+++ b/spot-ingest/README.md
@@ -20,6 +20,12 @@
 ### Install
 1. Install Python dependencies `pip install -r requirements.txt` 
 
+Optional:
+2. the sasl python package requires the following:
+   * Centos: `yum install cyrus-sasl-devel`
+   * Debian/Ubuntu: `apt-get install libsasl2-dev`
+3. install Python dependencies for Kerberos `pip install -r kerberos-requirements.txt`
+
 ### Configure Kafka
 **Adding Kafka Service:**
 
diff --git a/spot-ingest/common/configurator.py b/spot-ingest/common/configurator.py
new file mode 100644
index 0000000..f8defc6
--- /dev/null
+++ b/spot-ingest/common/configurator.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ConfigParser
+from io import open
+
+
+def configuration():
+
+    config = ConfigParser.ConfigParser()
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
+    return config
+
+
+def db():
+    return get_conf('DBNAME')
+
+
+def impala():
+    return get_conf('IMPALA_DEM'), get_conf('IMPALA_PORT')
+
+
+def hdfs():
+    return get_conf('NAME_NODE'), get_conf('WEB_PORT'), get_conf('HUSER').split("/")[-1]
+
+
+def spot():
+    return get_conf('HUSER')
+
+
+def kerberos_enabled():
+    enabled = get_conf('KERBEROS')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    if kerberos_enabled():
+        return get_conf('PRINCIPAL'), get_conf('KEYTAB'), get_conf('SASL_MECH'), get_conf('SECURITY_PROTO')
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    enabled = get_conf('SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    if ssl_enabled():
+        return get_conf('SSL_VERIFY'), get_conf('CA_LOCATION'), get_conf('CERT'), get_conf('KEY')
+    else:
+        raise KeyError
+
+
+def get_conf(key):
+    conf = configuration()
+    header = 'conf'
+    result = conf.get(header, key)
+    return result.replace("'", "").replace('"', '').encode('ascii', 'ignore')
+
+
+class SecHead(object):
+    def __init__(self, fp):
+        self.fp = fp
+        self.sechead = '[conf]\n'
+
+    def readline(self):
+        if self.sechead:
+            try:
+                return self.sechead
+            finally:
+                self.sechead = None
+        else:
+            return self.fp.readline()
diff --git a/spot-ingest/common/hdfs_client.py b/spot-ingest/common/hdfs_client.py
new file mode 100644
index 0000000..5605e9c
--- /dev/null
+++ b/spot-ingest/common/hdfs_client.py
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from hdfs import InsecureClient
+from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
+from json import dump
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port),
+            'mutual_auth': 'OPTIONAL'
+            }
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        # TODO: handle other conditions
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
+
+    return client
+
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
+    with client.read(hdfs_file) as reader:
+        results = reader.read()
+        return results
+
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
+            for item in hdfs_file_content:
+                data = ','.join(str(d) for d in item)
+                writer.write("{0}\n".format(data))
+        return True
+
+    except HdfsError:
+        return False
+
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
+            dump(hdfs_file_content, writer)
+        return True
+    except HdfsError:
+        return False
+
+
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        return client.list(hdfs_path)
+    except HdfsError:
+        return {}
+
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(hdfs_path, client)
+    if str(file_name) in files:
+        return True
+    else:
+        return False
diff --git a/spot-ingest/common/hive_engine.py b/spot-ingest/common/hive_engine.py
new file mode 100644
index 0000000..eb3d79e
--- /dev/null
+++ b/spot-ingest/common/hive_engine.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from impala.dbapi import connect
+import common.configurator as config
+
+
+def create_connection():
+
+    host, port = config.hive()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    conf.update({'kerberos_service_name': 'hive'})
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+    else:
+        conf.update({'auth_mechanism': 'PLAIN',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        if ssl_verify.lower() == 'false':
+            conf.update({'use_ssl': ssl_verify})
+        else:
+            conf.update({'ca_cert': cert,
+                         'use_ssl': ssl_verify
+                         })
+
+    db = config.db()
+    conn = connect(host=host, port=int(port), database=db, **conf)
+    return conn.cursor()
+
+
+def execute_query(query,fetch=False):
+
+    impala_cursor = create_connection()
+    impala_cursor.execute(query)
+
+    return impala_cursor if not fetch else impala_cursor.fetchall()
+
+
+def execute_query_as_list(query):
+
+    query_results = execute_query(query)
+    row_result = {}
+    results = []
+
+    for row in query_results:
+        x=0
+        for header in query_results.description:
+            row_result[header[0]] = row[x]
+            x +=1
+        results.append(row_result)
+        row_result = {}
+
+    return results
diff --git a/spot-ingest/common/kafka_client.py b/spot-ingest/common/kafka_client.py
index 977cb92..15441b2 100755
--- a/spot-ingest/common/kafka_client.py
+++ b/spot-ingest/common/kafka_client.py
@@ -19,23 +19,23 @@
 
 import logging
 import os
+import sys
 from common.utils import Util
-from kafka import KafkaProducer
-from kafka import KafkaConsumer as KC
-from kafka.partitioner.roundrobin import RoundRobinPartitioner
-from kafka.common import TopicPartition
-
-class KafkaTopic(object):
+from confluent_kafka import Producer
+from confluent_kafka import Consumer
+import common.configurator as config
 
 
-    def __init__(self,topic,server,port,zk_server,zk_port,partitions):
+class KafkaProducer(object):
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partitions)
+    def __init__(self, topic, server, port, zk_server, zk_port, partitions):
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions):
+        self._initialize_members(topic, server, port, zk_server, zk_port, partitions)
+
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partitions):
 
         # get logger isinstance
-        self._logger = logging.getLogger("SPOT.INGEST.KAFKA")
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaProducer")
 
         # kafka requirements
         self._server = server
@@ -46,42 +46,93 @@
         self._num_of_partitions = partitions
         self._partitions = []
         self._partitioner = None
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
 
         # create topic with partitions
         self._create_topic()
 
+        self._kafka_conf = self._producer_config(self._kafka_brokers)
+
+        self._p = Producer(**self._kafka_conf)
+
+    def _producer_config(self, server):
+        # type: (str) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+        }
+
+        if os.environ.get('KAFKA_DEBUG'):
+            connection_conf.update({'debug': 'all'})
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
+
     def _create_topic(self):
 
-        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions))     
-
-        # Create partitions for the workers.
-        self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]        
-
-        # create partitioner
-        self._partitioner = RoundRobinPartitioner(self._partitions)
+        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic, self._num_of_partitions))
         
         # get script path 
-        zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port)
-        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(os.path.dirname(os.path.abspath(__file__)),self._topic,zk_conf,self._num_of_partitions)
+        zk_conf = "{0}:{1}".format(self._zk_server, self._zk_port)
+        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(
+            os.path.dirname(os.path.abspath(__file__)),
+            self._topic,
+            zk_conf,
+            self._num_of_partitions
+        )
 
         # execute create topic cmd
-        Util.execute_cmd(create_topic_cmd,self._logger)
+        Util.execute_cmd(create_topic_cmd, self._logger)
 
-    def send_message(self,message,topic_partition):
+    def SendMessage(self, message, topic):
+        p = self._p
+        p.produce(topic, message.encode('utf-8'), callback=self._delivery_callback)
+        p.poll(0)
+        p.flush(timeout=3600000)
 
-        self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition))
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)             
-        producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000)
-        future = producer.send(self._topic,message,partition=topic_partition)
-        producer.flush(timeout=3600000)
-        producer.close()
-    
     @classmethod
-    def SendMessage(cls,message,kafka_servers,topic,partition=0):
-        producer = KafkaProducer(bootstrap_servers=kafka_servers,api_version_auto_timeout_ms=3600000)
-        future = producer.send(topic,message,partition=partition)
-        producer.flush(timeout=3600000)
-        producer.close()  
+    def _delivery_callback(cls, err, msg):
+        if err:
+            sys.stderr.write('%% Message failed delivery: %s\n' % err)
+        else:
+            sys.stderr.write('%% Message delivered to %s [%d]\n' %
+                             (msg.topic(), msg.partition()))
 
     @property
     def Topic(self):
@@ -93,22 +144,24 @@
 
     @property
     def Zookeeper(self):
-        zk = "{0}:{1}".format(self._zk_server,self._zk_port)
+        zk = "{0}:{1}".format(self._zk_server, self._zk_port)
         return zk
 
     @property
     def BootstrapServers(self):
-        servers = "{0}:{1}".format(self._server,self._port) 
+        servers = "{0}:{1}".format(self._server, self._port)
         return servers
 
 
 class KafkaConsumer(object):
     
-    def __init__(self,topic,server,port,zk_server,zk_port,partition):
+    def __init__(self, topic, server, port, zk_server, zk_port, partition):
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partition)
+        self._initialize_members(topic, server, port, zk_server, zk_port, partition)
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partition):
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partition):
+
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaConsumer")
 
         self._topic = topic
         self._server = server
@@ -116,14 +169,64 @@
         self._zk_server = zk_server
         self._zk_port = zk_port
         self._id = partition
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
+        self._kafka_conf = self._consumer_config(self._id, self._kafka_brokers)
+
+    def _consumer_config(self, groupid, server):
+        # type: (dict) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+            'group.id': groupid,
+        }
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000,
+                'default.topic.config': {
+                    'auto.commit.enable': 'true',
+                    'auto.commit.interval.ms': '60000',
+                    'auto.offset.reset': 'smallest'}
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
 
     def start(self):
-        
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)
-        consumer =  KC(bootstrap_servers=[kafka_brokers],group_id=self._topic)
-        partition = [TopicPartition(self._topic,int(self._id))]
-        consumer.assign(partitions=partition)
-        consumer.poll()
+
+        consumer = Consumer(**self._kafka_conf)
+        consumer.subscribe([self._topic])
         return consumer
 
     @property
@@ -132,6 +235,4 @@
 
     @property
     def ZookeperServer(self):
-        return "{0}:{1}".format(self._zk_server,self._zk_port)
-
-    
+        return "{0}:{1}".format(self._zk_server, self._zk_port)
diff --git a/spot-ingest/common/kafka_topic.sh b/spot-ingest/common/kafka_topic.sh
index ab95495..4c078c9 100755
--- a/spot-ingest/common/kafka_topic.sh
+++ b/spot-ingest/common/kafka_topic.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/spot-ingest/common/kerberos.py b/spot-ingest/common/kerberos.py
index 1cdca78..95baef9 100755
--- a/spot-ingest/common/kerberos.py
+++ b/spot-ingest/common/kerberos.py
@@ -17,31 +17,37 @@
 # limitations under the License.
 #
 
-import os
-import subprocess
 import sys
+import os
+import common.configurator as config
+from common.utils import Util
+
 
 class Kerberos(object):
-
     def __init__(self):
 
-        self._kinit =  os.getenv('KINITPATH')
-        self._kinitopts =  os.getenv('KINITOPTS')
-        self._keytab =  os.getenv('KEYTABPATH')
-        self._krb_user =  os.getenv('KRB_USER')
+        self._logger = Util.get_logger('SPOT.COMMON.KERBEROS')
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
 
-        if self._kinit == None or self._kinitopts == None or self._keytab == None or self._krb_user == None:
-            print "Please verify kerberos configuration, some environment variables are missing."
+        if os.getenv('KINITPATH'):
+            self._kinit = os.getenv('KINITPATH')
+        else:
+            self._kinit = "kinit"
+
+        self._kinitopts = os.getenv('KINITOPTS')
+        self._keytab = "-kt {0}".format(keytab)
+        self._krb_user = principal
+
+        if self._kinit == None or self._keytab == None or self._krb_user == None:
+            self._logger.error("Please verify kerberos configuration, some environment variables are missing.")
             sys.exit(1)
 
-        self._kinit_args = [self._kinit,self._kinitopts,self._keytab,self._krb_user]
+        if self._kinitopts is None:
+            self._kinit_cmd = "{0} {1} {2}".format(self._kinit, self._keytab, self._krb_user)
+        else:
+            self._kinit_cmd = "{0} {1} {2} {3}".format(self._kinit, self._kinitopts, self._keytab, self._krb_user)
 
-	def authenticate(self):
+    def authenticate(self):
 
-		kinit = subprocess.Popen(self._kinit_args, stderr = subprocess.PIPE)
-		output,error = kinit.communicate()
-		if not kinit.returncode == 0:
-			if error:
-				print error.rstrip()
-				sys.exit(kinit.returncode)
-		print "Successfully authenticated!"
+        Util.execute_cmd(self._kinit_cmd, self._logger)
+        self._logger.info("Kerberos ticket obtained")
diff --git a/spot-ingest/kerberos-requirements.txt b/spot-ingest/kerberos-requirements.txt
new file mode 100644
index 0000000..ae5ea26
--- /dev/null
+++ b/spot-ingest/kerberos-requirements.txt
@@ -0,0 +1,4 @@
+thrift_sasl==0.2.1
+sasl
+hdfs[kerberos]
+requests-kerberos
\ No newline at end of file
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 9cd91ea..23be9f4 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -21,70 +21,81 @@
 import os
 import json
 import sys
+import datetime
 from common.utils import Util
 from common.kerberos import Kerberos
-from common.kafka_client import KafkaTopic
-import datetime 
+import common.configurator as Config
+from common.kafka_client import KafkaProducer
 
 # get master configuration.
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-master_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+MASTER_CONF = json.loads(open(CONF_FILE).read())
+
 
 def main():
 
     # input Parameters
     parser = argparse.ArgumentParser(description="Master Collector Ingest Daemon")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-w','--workers',dest='workers_num',required=True,help='Number of workers for the ingest process',metavar='')
-    parser.add_argument('-id','--ingestId',dest='ingest_id',required=False,help='Ingest ID',metavar='')
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-w', '--workers', dest='workers_num',
+                        required=True, help='Number of workers for the ingest process',
+                        metavar='')
+    parser.add_argument('-id', '--ingestId', dest='ingest_id',
+                        required=False, help='Ingest ID', metavar='')
     args = parser.parse_args()
 
     # start collector based on data source type.
-    start_collector(args.type,args.workers_num,args.ingest_id)
+    start_collector(args.type, args.workers_num, args.ingest_id)
 
-def start_collector(type,workers_num,id=None):
+
+def start_collector(type, workers_num, id=None):
 
     # generate ingest id
-    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_")
-    
+    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":", "_").replace(".", "_")
+
     # create logger.
     logger = Util.get_logger("SPOT.INGEST")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in master_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in MASTER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(master_conf["pipelines"][type]["type"]):
-        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"]));
+    if not Util.validate_data_source(MASTER_CONF["pipelines"][type]["type"]):
+        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(MASTER_CONF["pipelines"][type]["type"]))
         sys.exit(1)
-    
+
     # validate if kerberos authentication is required.
-    if os.getenv('KRB_AUTH'):
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()
-    
+
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = master_conf["kafka"]['kafka_server']
-    k_port = master_conf["kafka"]['kafka_port']
+    k_server = MASTER_CONF["kafka"]['kafka_server']
+    k_port = MASTER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = master_conf["kafka"]['zookeper_server']
-    zk_port = master_conf["kafka"]['zookeper_port']
+    zk_server = MASTER_CONF["kafka"]['zookeper_server']
+    zk_port = MASTER_CONF["kafka"]['zookeper_port']
          
-    topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id
-    kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num)
+    topic = "{0}".format(type,ingest_id) if not id else id
+    producer = KafkaProducer(topic, k_server, k_port, zk_server, zk_port, workers_num)
 
     # create a collector instance based on data source type.
     logger.info("Starting {0} ingest instance".format(topic))
-    module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector'])
+    module = __import__("pipelines.{0}.collector".
+                        format(MASTER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Collector'])
 
     # start collector.
-    ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type)
+    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], producer, type)
     ingest_collector.start()
 
-if __name__=='__main__':
+
+if __name__ == '__main__':
     main()
diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py
index c421c47..97c5ed6 100755
--- a/spot-ingest/pipelines/dns/collector.py
+++ b/spot-ingest/pipelines/dns/collector.py
@@ -18,26 +18,29 @@
 #
 
 import time
-import os
-import subprocess
-import json
 import logging
+import os
+import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self, hdfs_app_path, kafka_topic, conf_type):
-        self._initialize_members(hdfs_app_path, kafka_topic, conf_type)
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
 
-    def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type):
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
+
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.DNS')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -64,6 +67,8 @@
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,74 +79,108 @@
             while True:
                 self._ingest_files_pool()
                 time.sleep(self._ingestion_interval)
-
         except KeyboardInterrupt:
             self._logger.info("Stopping DNS collector...")
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper, self._kafka_topic.Topic, self._logger)
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()
             self._pool.join()
             SystemExit("Ingest finished...")
 
-
     def _ingest_files_pool(self):
+
         if self._watcher.HasFiles:
+
             for x in range(0, self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file, args=(file, self._pkt_num, self._pcap_split_staging, self._kafka_topic.Partition, self._hdfs_root_path, self._kafka_topic.Topic, self._kafka_topic.BootstrapServers, ))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        )
+                else:
+                    resutl = self._pool.apply_async(_ingest_file, args=(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        ))
+                # resutl.get() # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
 
-def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers):
+
+def _ingest_file(hdfs_client, new_file, pkt_num, pcap_split_staging, hdfs_root_path, producer, topic):
 
     logger = logging.getLogger('SPOT.INGEST.DNS.{0}'.format(os.getpid()))
     
     try:
         # get file name and date.
-        org_file = file
-        file_name_parts = file.split('/')
+        org_file = new_file
+        file_name_parts = new_file.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
 
         # split file.
         name = file_name.split('.')[0]
-        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,file,pcap_split_staging,name)
+        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,
+                                                                  new_file,
+                                                                  pcap_split_staging,
+                                                                  name)
         logger.info("Splitting file: {0}".format(split_cmd))
         Util.execute_cmd(split_cmd,logger)
 
         logger.info("Removing file: {0}".format(org_file))
         rm_big_file = "rm {0}".format(org_file)
-        Util.execute_cmd(rm_big_file,logger)    
+        Util.execute_cmd(rm_big_file,logger)
 
-        for currdir,subdir,files in os.walk(pcap_split_staging):
-            for file in files:
-                if file.endswith(".pcap") and "{0}_spot".format(name) in file:
-
-                        # get timestamp from the file name to build hdfs path.
-                        file_date = file.split('.')[0]
-                        pcap_hour = file_date[-6:-4]
-                        pcap_date_path = file_date[-14:-6]
-
-                        # hdfs path with timestamp.
-                        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour)
-
-                        # create hdfs path.
-                        Util.creat_hdfs_folder(hdfs_path,logger)
-
-                        # load file to hdfs.
-                        hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
-                        Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger)
-
-                        # create event for workers to process the file.
-                        logger.info( "Sending split file to worker number: {0}".format(partition))
-                        KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition)
-                        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
-
-  
     except Exception as err:
-        
-        logger.error("There was a problem, please check the following error message:{0}".format(err.message))
+        logger.error("There was a problem splitting the file: {0}".format(err.message))
         logger.error("Exception: {0}".format(err))
 
+    for currdir, subdir, files in os.walk(pcap_split_staging):
+        for file in files:
+            if file.endswith(".pcap") and "{0}_spot".format(name) in file:
+                # get timestamp from the file name to build hdfs path.
+                file_date = file.split('.')[0]
+                pcap_hour = file_date[-6:-4]
+                pcap_date_path = file_date[-14:-6]
+
+                # hdfs path with timestamp.
+                hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour)
+
+                # create hdfs path.
+                try:
+                    if len(hdfs.list_dir(hdfs_path, hdfs_client)) == 0:
+                        logger.info('creating directory: ' + hdfs_path)
+                        hdfs_client.mkdir(hdfs_path, hdfs_client)
+
+                    # load file to hdfs.
+                    hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
+                    result = hdfs_client.upload_file(hadoop_pcap_file, os.path.join(currdir,file))
+                    if not result:
+                        logger.error('File failed to upload: ' + hadoop_pcap_file)
+                        raise HdfsException
+
+                    # create event for workers to process the file.
+                    logger.info( "Sending split file to Topic: {0}".format(topic))
+                    producer.SendMessage(hadoop_pcap_file, topic)
+                    logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
+
+                except HdfsException as err:
+                    logger.error('Exception: ' + err.exception)
+                    logger.info('Check Hdfs Connection settings and server health')
+
+                except Exception as err:
+                    logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(new_file,topic))
+                    logger.error("Error: {0}".format(err))
\ No newline at end of file
diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py
index 6f51f45..f23fa8f 100755
--- a/spot-ingest/pipelines/dns/worker.py
+++ b/spot-ingest/pipelines/dns/worker.py
@@ -21,18 +21,22 @@
 import datetime
 import subprocess
 import json
+import sys
 import os
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
         
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+        self._initialize_members(db_name,hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.DNS')
@@ -44,32 +48,58 @@
         self._script_path = os.path.dirname(os.path.abspath(__file__))
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
-        self._conf = conf["pipelines"][conf_type] 
+        self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        self._cursor = hive_engine.create_connection()
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
+        consumer = self.kafka_consumer.start()
 
-    def _new_file(self,file):
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
 
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
+
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
         self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        p = Process(target=self._process_new_file, args=nf)
         p.start() 
         p.join()
 
-    def _process_new_file(self,file):
+    def _process_new_file(self, nf):
+
 
         # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+        self._logger.info("Getting file from hdfs: {0}".format(nf))
+        if hdfs.file_exists(nf):
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
 
         # get file name and date
         file_name_parts = file.split('/')
@@ -82,37 +112,86 @@
         binary_day = binary_date_path[6:8]
 
         # build process cmd.
-        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging, file_name, self._process_opt)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)
+        Util.execute_cmd(process_cmd, self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/dns".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
         hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        Util.execute_cmd(mv_to_staging,self._logger)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
 
         #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path)
+        drop_table = 'DROP TABLE IF EXISTS {0}.dns_tmp'.format(self._db_name)
+        self._cursor.execute(drop_table)
 
-        self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        # Create external table
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.dns_tmp (\n"
+                           "  frame_day STRING,\n"
+                           "  frame_time STRING,\n"
+                           "  unix_tstamp BIGINT,\n"
+                           "  frame_len INT,\n"
+                           "  ip_src STRING,\n"
+                           "  ip_dst STRING,\n"
+                           "  dns_qry_name STRING,\n"
+                           "  dns_qry_type INT,\n"
+                           "  dns_qry_class STRING,\n"
+                           "  dns_qry_rcode INT,\n"
+                           "  dns_a STRING  \n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawDnsRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.dns.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"frame_day\",        \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_time\",     \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"unix_tstamp\",    \"type\":[\"bigint\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_len\",      \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"ip_src\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"ip_dst\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_name\",   \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_type\",   \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_class\",  \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_rcode\",  \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_a\",          \"type\":[\"string\", \"null\"]}\n"
+                           "      ]\n"
+                           "}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute(create_external)
+
+        # Insert data
+        insert_into_table = """
+            INSERT INTO TABLE {0}.dns
+            PARTITION (y={1}, m={2}, d={3}, h={4)
+            SELECT   CONCAT(frame_day , frame_time) as treceived, unix_tstamp, frame_len, ip_dst, ip_src, dns_qry_name,
+            dns_qry_class,dns_qry_type, dns_qry_rcode, dns_a 
+            FROM {0}.dns_tmp
+        """.format(self._db_name,binary_year,binary_month,binary_day,binary_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
-        Util.execute_cmd(rm_local_staging,self._logger)
+        Util.execute_cmd(rm_local_staging, self._logger)
 
         self._logger.info("File {0} was successfully processed.".format(file_name))
diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py
index b9a97f2..5e5cd49 100755
--- a/spot-ingest/pipelines/flow/collector.py
+++ b/spot-ingest/pipelines/flow/collector.py
@@ -23,22 +23,24 @@
 import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self,hdfs_app_path,kafka_topic,conf_type):
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
         
-        self._initialize_members(hdfs_app_path,kafka_topic,conf_type)
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
 
-    def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):
-  
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
+
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.FLOW')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -62,6 +64,8 @@
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,54 +78,83 @@
                 time.sleep(self._ingestion_interval)
         except KeyboardInterrupt:
             self._logger.info("Stopping FLOW collector...")  
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)          
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()            
             self._pool.join()
             SystemExit("Ingest finished...")
-    
 
     def _ingest_files_pool(self):            
        
         if self._watcher.HasFiles:
             
-            for x in range(0,self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+            for x in range(0, self._processes):
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                                 new_file,
+                                 self._hdfs_root_path,
+                                 self._producer,
+                                 self._producer.Topic
+                                 )
+                else:
+                    result = self._pool.apply_async(_ingest_file, args=(
+                        new_file,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                    ))
+                    # result.get()  # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
-    
 
 
-def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers):
+def _ingest_file(new_file, hdfs_root_path, producer, topic):
 
-        logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
+    logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
+
+    try:
+
+        # get file name and date.
+        file_name_parts = new_file.split('/')
+        file_name = file_name_parts[len(file_name_parts)-1]
+        file_date = file_name.split('.')[1]
+        file_date_path = file_date[0:8]
+        file_date_hour = file_date[8:10]
+
+        # hdfs path with timestamp.
+        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, file_date_path, file_date_hour)
+        hdfs_file = "{0}/{1}".format(hdfs_path, file_name)
 
         try:
+            if len(hdfs.list_dir(hdfs_path)) == 0:
+                logger.info('creating directory: ' + hdfs_path)
+                hdfs.mkdir(hdfs_path)
+            logger.info('uploading file to hdfs: ' + hdfs_file)
+            result = hdfs.upload_file(hdfs_path, new_file)
+            if not result:
+                logger.error('File failed to upload: ' + hdfs_file)
+                raise HdfsException
+            else:
+                rm_file = "rm {0}".format(new_file)
+                logger.info("Removing files from local staging: {0}".format(rm_file))
+                Util.execute_cmd(rm_file, logger)
 
-            # get file name and date.
-            file_name_parts = file.split('/')
-            file_name = file_name_parts[len(file_name_parts)-1]
-            file_date = file_name.split('.')[1]
+        except HdfsException as err:
+            logger.error('Exception: ' + err.exception)
+            logger.info('Check Hdfs Connection settings and server health')
 
-            file_date_path = file_date[0:8]
-            file_date_hour = file_date[8:10]
+    except Exception as err:
+        logger.error("There was a problem, Exception: {0}".format(err))
 
-            # hdfs path with timestamp.
-            hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,file_date_path,file_date_hour)
-            Util.creat_hdfs_folder(hdfs_path,logger)
-
-            # load to hdfs.
-            hdfs_file = "{0}/{1}".format(hdfs_path,file_name)
-            Util.load_to_hdfs(file,hdfs_file,logger)
-
-            # create event for workers to process the file.
-            logger.info("Sending file to worker number: {0}".format(partition))
-            KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition)    
-            logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
-        except Exception as err:
-            logger.error("There was a problem, please check the following error message:{0}".format(err.message))
-            logger.error("Exception: {0}".format(err))
+        # create event for workers to process the file.
+        # logger.info("Sending file to worker number: {0}".format(partition))
+    try:
+        producer.SendMessage(hdfs_file, topic)
+        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+    except Exception as err:
+        logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+        logger.error("Error: {0}".format(err))
diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py
index 1630022..bb957a5 100755
--- a/spot-ingest/pipelines/flow/worker.py
+++ b/spot-ingest/pipelines/flow/worker.py
@@ -22,17 +22,20 @@
 import datetime
 import logging
 import os
-import json 
+import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
+        self._initialize_members(db_name, hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.FLOW')
@@ -45,76 +48,186 @@
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
         self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        # self._cursor = hive_engine.create_connection()
+        self._cursor = hive_engine
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
+        consumer = self.kafka_consumer.start()
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
 
-    def _new_file(self,file):
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
 
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
-        self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
+        self._logger.info("File: {0} ".format(nf))
+
+        p = Process(target=self._process_new_file, args=(nf, ))
         p.start()
         p.join()
         
-    def _process_new_file(self,file):
-
-        # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+    def _process_new_file(self, nf):
 
         # get file name and date
-        file_name_parts = file.split('/')
+        file_name_parts = nf.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
-
+        nf_path = nf.rstrip(file_name)
         flow_date = file_name.split('.')[1]
         flow_year = flow_date[0:4]
         flow_month = flow_date[4:6]
         flow_day = flow_date[6:8]
         flow_hour = flow_date[8:10]
 
+        # get file from hdfs
+        if hdfs.file_exists(nf_path, file_name):
+            self._logger.info("Getting file from hdfs: {0}".format(nf))
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
+
         # build process cmd.
-        process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        sf = "{0}{1}.csv".format(self._local_staging,file_name)
+        process_cmd = "nfdump -o csv -r {0}{1} {2} > {3}".format(self._local_staging, file_name, self._process_opt, sf)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)        
+        Util.execute_cmd(process_cmd,self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/flow".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
-        hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        subprocess.call(mv_to_staging,shell=True)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
 
-        #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/flow/load_flow_avro_parquet.hql".format(self._db_name,flow_year,flow_month,flow_day,flow_hour,hdfs_staging_path)
+        # load with impyla
+        drop_table = "DROP TABLE IF EXISTS {0}.flow_tmp".format(self._db_name)
+        self._logger.info( "Dropping temp table: {0}".format(drop_table))
+        self._cursor.execute_query(drop_table)
 
-        self._logger.info( "Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.flow_tmp (\n"
+                           "  treceived STRING,\n"
+                           "  tryear INT,\n"
+                           "  trmonth INT,\n"
+                           "  trday INT,\n"
+                           "  trhour INT,\n"
+                           "  trminute INT,\n"
+                           "  trsec INT,\n"
+                           "  tdur FLOAT,\n"
+                           "  sip  STRING,\n"
+                           "  dip STRING,\n"
+                           "  sport INT,\n"
+                           "  dport INT,\n"
+                           "  proto STRING,\n"
+                           "  flag STRING,\n"
+                           "  fwd INT,\n"
+                           "  stos INT,\n"
+                           "  ipkt BIGINT,\n"
+                           "  ibyt BIGINT,\n"
+                           "  opkt BIGINT,\n"
+                           "  obyt BIGINT,\n"
+                           "  input INT,\n"
+                           "  output INT,\n"
+                           "  sas INT,\n"
+                           "  das INT,\n"
+                           "  dtos INT,\n"
+                           "  dir INT,\n"
+                           "  rip STRING\n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawFlowRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.flows.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"treceived\",             \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tryear\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trmonth\",             \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trday\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trhour\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trminute\",            \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trsec\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tdur\",                \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"proto\",              \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"flag\",               \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"fwd\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"stos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ipkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ibytt\",              \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"opkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"obyt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"input\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"output\",                \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sas\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"das\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dtos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dir\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"rip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ]\n"
+                           "}}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute_query(create_external)
+
+        insert_into_table = """
+        INSERT INTO TABLE {0}.flow
+        PARTITION (y={1}, m={2}, d={3}, h={4})
+        SELECT   treceived,  unix_timestamp(treceived) AS unix_tstamp, tryear,  trmonth, trday,  trhour,  trminute,  trsec,
+          tdur,  sip, dip, sport, dport,  proto,  flag,  fwd,  stos,  ipkt,  ibyt,  opkt,  obyt,  input,  output,
+          sas,  das,  dtos,  dir,  rip
+        FROM {0}.flow_tmp
+        """.format(self._db_name, flow_year, flow_month, flow_day, flow_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute_query(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
         Util.execute_cmd(rm_local_staging,self._logger)
 
+        rm_local_staging = "rm {0}".format(sf)
+        self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
+        Util.execute_cmd(rm_local_staging,self._logger)
+
         self._logger.info("File {0} was successfully processed.".format(file_name))
-
-
diff --git a/spot-ingest/pipelines/proxy/bluecoat.py b/spot-ingest/pipelines/proxy/bluecoat.py
index 31d89ca..d476733 100644
--- a/spot-ingest/pipelines/proxy/bluecoat.py
+++ b/spot-ingest/pipelines/proxy/bluecoat.py
@@ -1,3 +1,7 @@
+"""
+This script adds support for ingesting Bluecoat log files
+into Apache Spot.
+"""
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -28,121 +32,174 @@
 rex_date = re.compile("\d{4}-\d{2}-\d{2}")
 
 proxy_schema = StructType([
-                                    StructField("p_date", StringType(), True),
-                                    StructField("p_time", StringType(), True),
-                                    StructField("clientip", StringType(), True),
-                                    StructField("host", StringType(), True),
-                                    StructField("reqmethod", StringType(), True),
-                                    StructField("useragent", StringType(), True),
-                                    StructField("resconttype", StringType(), True),
-                                    StructField("duration", IntegerType(), True),
-                                    StructField("username", StringType(), True),
-                                    StructField("authgroup", StringType(), True),
-                                    StructField("exceptionid", StringType(), True),
-                                    StructField("filterresult", StringType(), True),
-                                    StructField("webcat", StringType(), True),
-                                    StructField("referer", StringType(), True),
-                                    StructField("respcode", StringType(), True),
-                                    StructField("action", StringType(), True),
-                                    StructField("urischeme", StringType(), True),
-                                    StructField("uriport", StringType(), True),
-                                    StructField("uripath", StringType(), True),
-                                    StructField("uriquery", StringType(), True),
-                                    StructField("uriextension", StringType(), True),
-                                    StructField("serverip", StringType(), True),
-                                    StructField("scbytes", IntegerType(), True),
-                                    StructField("csbytes", IntegerType(), True),
-                                    StructField("virusid", StringType(), True),
-                                    StructField("bcappname", StringType(), True),
-                                    StructField("bcappoper", StringType(), True),
-                                    StructField("fulluri", StringType(), True),
-                                    StructField("y", StringType(), True),
-                                    StructField("m", StringType(), True),
-                                    StructField("d", StringType(), True),
-                                    StructField("h", StringType(), True)])
+    StructField("p_date", StringType(), True),
+    StructField("p_time", StringType(), True),
+    StructField("clientip", StringType(), True),
+    StructField("host", StringType(), True),
+    StructField("reqmethod", StringType(), True),
+    StructField("useragent", StringType(), True),
+    StructField("resconttype", StringType(), True),
+    StructField("duration", IntegerType(), True),
+    StructField("username", StringType(), True),
+    StructField("authgroup", StringType(), True),
+    StructField("exceptionid", StringType(), True),
+    StructField("filterresult", StringType(), True),
+    StructField("webcat", StringType(), True),
+    StructField("referer", StringType(), True),
+    StructField("respcode", StringType(), True),
+    StructField("action", StringType(), True),
+    StructField("urischeme", StringType(), True),
+    StructField("uriport", StringType(), True),
+    StructField("uripath", StringType(), True),
+    StructField("uriquery", StringType(), True),
+    StructField("uriextension", StringType(), True),
+    StructField("serverip", StringType(), True),
+    StructField("scbytes", IntegerType(), True),
+    StructField("csbytes", IntegerType(), True),
+    StructField("virusid", StringType(), True),
+    StructField("bcappname", StringType(), True),
+    StructField("bcappoper", StringType(), True),
+    StructField("fulluri", StringType(), True),
+    StructField("y", StringType(), True),
+    StructField("m", StringType(), True),
+    StructField("d", StringType(), True),
+    StructField("h", StringType(), True)])
+
 
 def main():
-    
+    """
+    Handle commandline arguments and
+    start the collector.
+    """
     # input Parameters
     parser = argparse.ArgumentParser(description="Bluecoat Parser")
-    parser.add_argument('-zk','--zookeeper',dest='zk',required=True,help='Zookeeper IP and port (i.e. 10.0.0.1:2181)',metavar='')
-    parser.add_argument('-t','--topic',dest='topic',required=True,help='Topic to listen for Spark Streaming',metavar='')
-    parser.add_argument('-db','--database',dest='db',required=True,help='Hive database whete the data will be ingested',metavar='')
-    parser.add_argument('-dt','--db-table',dest='db_table',required=True,help='Hive table whete the data will be ingested',metavar='')
-    parser.add_argument('-w','--num_of_workers',dest='num_of_workers',required=True,help='Num of workers for Parallelism in Data Processing',metavar='')
-    parser.add_argument('-bs','--batch-size',dest='batch_size',required=True,help='Batch Size (Milliseconds)',metavar='')
+    parser.add_argument('-zk', '--zookeeper', dest='zk', required=True,
+                        help='Zookeeper IP and port (i.e. 10.0.0.1:2181)', metavar='')
+    parser.add_argument('-t', '--topic', dest='topic', required=True,
+                        help='Topic to listen for Spark Streaming', metavar='')
+    parser.add_argument('-db', '--database', dest='db', required=True,
+                        help='Hive database whete the data will be ingested', metavar='')
+    parser.add_argument('-dt', '--db-table', dest='db_table', required=True,
+                        help='Hive table whete the data will be ingested', metavar='')
+    parser.add_argument('-w', '--num_of_workers', dest='num_of_workers', required=True,
+                        help='Num of workers for Parallelism in Data Processing', metavar='')
+    parser.add_argument('-bs', '--batch-size', dest='batch_size', required=True,
+                        help='Batch Size (Milliseconds)', metavar='')
     args = parser.parse_args()
 
     # start collector based on data source type.
-    bluecoat_parse(args.zk,args.topic,args.db,args.db_table,args.num_of_workers,args.batch_size)
+    bluecoat_parse(args.zk, args.topic, args.db, args.db_table, args.num_of_workers, args.batch_size)
+
 
 def spot_decoder(s):
+    """
+    Dummy decoder function.
 
+    :param s: input to decode
+    :returns: s
+    """
     if s is None:
         return None
     return s
 
-def split_log_entry(line):
 
+def split_log_entry(line):
+    """
+    Split the given line into its fields.
+
+    :param line: line to split
+    :returns: list
+    """
     lex = shlex.shlex(line)
     lex.quotes = '"'
     lex.whitespace_split = True
     lex.commenters = ''
     return list(lex)
 
+
 def proxy_parser(proxy_fields):
-    
+    """
+    Parse and normalize data.
+
+    :param proxy_fields: list with fields from log
+    :returns: list
+    """
     proxy_parsed_data = []
 
     if len(proxy_fields) > 1:
 
         # create full URI.
-        proxy_uri_path =  proxy_fields[17] if  len(proxy_fields[17]) > 1 else ""
-        proxy_uri_qry =  proxy_fields[18] if  len(proxy_fields[18]) > 1 else ""
-        full_uri= "{0}{1}{2}".format(proxy_fields[15],proxy_uri_path,proxy_uri_qry)
+        proxy_uri_path = proxy_fields[17] if len(proxy_fields[17]) > 1 else ""
+        proxy_uri_qry = proxy_fields[18] if len(proxy_fields[18]) > 1 else ""
+        full_uri = "{0}{1}{2}".format(proxy_fields[15], proxy_uri_path, proxy_uri_qry)
         date = proxy_fields[0].split('-')
-        year =  date[0]
+        year = date[0]
         month = date[1].zfill(2)
         day = date[2].zfill(2)
         hour = proxy_fields[1].split(":")[0].zfill(2)
-        # re-order fields. 
-        proxy_parsed_data = [proxy_fields[0],proxy_fields[1],proxy_fields[3],proxy_fields[15],proxy_fields[12],proxy_fields[20],proxy_fields[13],int(proxy_fields[2]),proxy_fields[4],
-        proxy_fields[5],proxy_fields[6],proxy_fields[7],proxy_fields[8],proxy_fields[9],proxy_fields[10],proxy_fields[11],proxy_fields[14],proxy_fields[16],proxy_fields[17],proxy_fields[18],
-        proxy_fields[19],proxy_fields[21],int(proxy_fields[22]),int(proxy_fields[23]),proxy_fields[24],proxy_fields[25],proxy_fields[26],full_uri,year,month,day,hour ]
+        # re-order fields.
+        proxy_parsed_data = [proxy_fields[0], proxy_fields[1], proxy_fields[3],
+                             proxy_fields[15], proxy_fields[12], proxy_fields[20],
+                             proxy_fields[13], int(proxy_fields[2]), proxy_fields[4],
+                             proxy_fields[5], proxy_fields[6], proxy_fields[7],
+                             proxy_fields[8], proxy_fields[9], proxy_fields[10],
+                             proxy_fields[11], proxy_fields[14], proxy_fields[16],
+                             proxy_fields[17], proxy_fields[18], proxy_fields[19],
+                             proxy_fields[21], int(proxy_fields[22]), int(proxy_fields[23]),
+                             proxy_fields[24], proxy_fields[25], proxy_fields[26],
+                             full_uri, year, month, day, hour]
 
     return proxy_parsed_data
 
 
-def save_data(rdd,sqc,db,db_table,topic):
+def save_data(rdd, sqc, db, db_table, topic):
+    """
+    Create and save a data frame with the given data.
 
+    :param rdd: collection of objects (Resilient Distributed Dataset) to store
+    :param sqc: Apache Hive context
+    :param db: Apache Hive database to save into
+    :param db_table: table of `db` to save into
+    :param topic: Apache Kafka topic to listen for (if `rdd` is empty)
+    """
     if not rdd.isEmpty():
 
-        df = sqc.createDataFrame(rdd,proxy_schema)        
+        df = sqc.createDataFrame(rdd, proxy_schema)
         sqc.setConf("hive.exec.dynamic.partition", "true")
         sqc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-        hive_table = "{0}.{1}".format(db,db_table)
-        df.write.format("parquet").mode("append").insertInto(hive_table)
+        hive_table = "{0}.{1}".format(db, db_table)
+        df.write.format("parquet").mode("append").partitionBy('y', 'm', 'd', 'h').insertInto(hive_table)
 
     else:
         print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic))
 
-def bluecoat_parse(zk,topic,db,db_table,num_of_workers,batch_size):
-    
+
+def bluecoat_parse(zk, topic, db, db_table, num_of_workers, batch_size):
+    """
+    Parse and save bluecoat logs.
+
+    :param zk: Apache ZooKeeper quorum
+    :param topic: Apache Kafka topic (application name)
+    :param db: Apache Hive database to save into
+    :param db_table: table of `db` to save into
+    :param num_of_workers: number of Apache Kafka workers
+    :param batch_size: batch size for Apache Spark streaming context
+    """
     app_name = topic
     wrks = int(num_of_workers)
 
     # create spark context
     sc = SparkContext(appName=app_name)
-    ssc = StreamingContext(sc,int(batch_size))
+    ssc = StreamingContext(sc, int(batch_size))
     sqc = HiveContext(sc)
 
     tp_stream = KafkaUtils.createStream(ssc, zk, app_name, {topic: wrks}, keyDecoder=spot_decoder, valueDecoder=spot_decoder)
 
-    proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace("  ", " ")).map(lambda row:  split_log_entry(row)).map(lambda row: proxy_parser(row))
-    saved_data = proxy_data.foreachRDD(lambda row: save_data(row,sqc,db,db_table,topic))
-    ssc.start();
+    proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace("  ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row))
+    saved_data = proxy_data.foreachRDD(lambda row: save_data(row, sqc, db, db_table, topic))
+    ssc.start()
     ssc.awaitTermination()
 
 
-if __name__ =='__main__':
+if __name__ == '__main__':
     main()
diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py
index 69d708c..008b568 100644
--- a/spot-ingest/pipelines/proxy/collector.py
+++ b/spot-ingest/pipelines/proxy/collector.py
@@ -23,7 +23,7 @@
 import sys
 import copy
 from common.utils import Util, NewFileEvent
-from common.kafka_client import KafkaTopic
+from common.kafka_client import KafkaProducer
 from multiprocessing import Pool
 from common.file_collector import FileWatcher
 import time
@@ -106,10 +106,10 @@
             for line in f:
                 message += line
                 if len(message) > message_size:
-                    KafkaTopic.SendMessage(message,kafka_servers,topic,0)
+                    KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
                     message = ""
             #send the last package.        
-            KafkaTopic.SendMessage(message,kafka_servers,topic,0)            
+            KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
         rm_file = "rm {0}".format(file)
         Util.execute_cmd(rm_file,logger)
         logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic))
diff --git a/spot-ingest/requirements.txt b/spot-ingest/requirements.txt
index 7d04054..71661bc 100644
--- a/spot-ingest/requirements.txt
+++ b/spot-ingest/requirements.txt
@@ -1,2 +1,5 @@
 watchdog
-kafka-python
+confluent-kafka
+impyla
+hdfs
+six >= 1.5
diff --git a/spot-ingest/start_ingest_standalone.sh b/spot-ingest/start_ingest_standalone.sh
index 0e3bfd5..5d7ce36 100755
--- a/spot-ingest/start_ingest_standalone.sh
+++ b/spot-ingest/start_ingest_standalone.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
@@ -58,7 +58,7 @@
 
 INGEST_DATE=`date +"%H_%M_%S"`
 
-screen -d -m -S SPOT-INGEST-${INGEST_CONF}-${INGEST_DATE}  -s /bin/bash
+screen -d -m -S SPOT-INGEST-${INGEST_CONF}-${INGEST_DATE}  -s `which bash`
 screen -S SPOT-INGEST-${INGEST_CONF}-${INGEST_DATE} -X setenv TZ ${TIME_ZONE}
 screen -dr  SPOT-INGEST-${INGEST_CONF}-${INGEST_DATE} -X screen -t Master sh -c "python master_collector.py -t ${INGEST_CONF} -w ${WORKERS_NUM} -id SPOT-INGEST-${INGEST_CONF}-${INGEST_DATE}; echo 'Closing Master...'; sleep 432000"
 
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index db51def..ce758c5 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -20,70 +20,78 @@
 import argparse
 import os
 import json
-import logging
 import sys
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaConsumer
+import common.configurator as Config
 
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-worker_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+WORKER_CONF = json.loads(open(CONF_FILE).read())
+
 
 def main():
 
     # input parameters
     parser = argparse.ArgumentParser(description="Worker Ingest Framework")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-i','--id',dest='id',required=True,help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',metavar='')
-    parser.add_argument('-top','--topic',dest='topic',required=True,help='Topic to read from.',metavar="")
-    parser.add_argument('-p','--processingParallelism',dest='processes',required=False,help='Processing Parallelism',metavar="")
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-i', '--id', dest='id', required=True,
+                        help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',
+                        metavar='')
+    parser.add_argument('-top', '--topic', dest='topic', required=True,
+                        help='Topic to read from.', metavar="")
+    parser.add_argument('-p', '--processingParallelism', dest='processes',
+                        required=False, help='Processing Parallelism', metavar="")
     args = parser.parse_args()
 
     # start worker based on the type.
-    start_worker(args.type,args.topic,args.id,args.processes)
+    start_worker(args.type, args.topic, args.id, args.processes)
 
 
-def start_worker(type,topic,id,processes=None):
+def start_worker(type, topic, id, processes=None):
 
     logger = Util.get_logger("SPOT.INGEST.WORKER")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in worker_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in WORKER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]):
-        logger.error("The provided data source {0} is not valid".format(type));sys.exit(1)
+    if not Util.validate_data_source(WORKER_CONF["pipelines"][type]["type"]):
+        logger.error("The provided data source {0} is not valid".format(type))
+        sys.exit(1)
 
-    # validate if kerberos authentication is requiered.
-    if os.getenv('KRB_AUTH'):
+    # validate if kerberos authentication is required.
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()
 
     # create a worker instance based on the data source type.
-    module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker'])
+    module = __import__("pipelines.{0}.worker".format(WORKER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Worker'])
 
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = worker_conf["kafka"]['kafka_server']
-    k_port = worker_conf["kafka"]['kafka_port']
+    k_server = WORKER_CONF["kafka"]['kafka_server']
+    k_port = WORKER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = worker_conf["kafka"]['zookeper_server']
-    zk_port = worker_conf["kafka"]['zookeper_port']
+    zk_server = WORKER_CONF["kafka"]['zookeper_server']
+    zk_port = WORKER_CONF["kafka"]['zookeper_port']
     topic = topic
 
     # create kafka consumer.
-    kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id)
+    kafka_consumer = KafkaConsumer(topic, k_server, k_port, zk_server, zk_port, id)
 
     # start worker.
-    db_name = worker_conf['dbname']
-    app_path = worker_conf['hdfs_app_path']
-    ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes)
+    db_name = WORKER_CONF['dbname']
+    app_path = WORKER_CONF['hdfs_app_path']
+    ingest_worker = module.Worker(db_name, app_path, kafka_consumer, type, processes)
     ingest_worker.start()
 
-if __name__=='__main__':
+if __name__ == '__main__':
     main()
-
diff --git a/spot-ml/build.sbt b/spot-ml/build.sbt
index 5ffc520..cc83ed0 100644
--- a/spot-ml/build.sbt
+++ b/spot-ml/build.sbt
@@ -23,9 +23,7 @@
 
 val sparkVersion = "2.1.0"
 
-import sbtassembly.Plugin.AssemblyKeys._
-
-assemblySettings
+baseAssemblySettings
 
 libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
 libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
@@ -37,7 +35,7 @@
 
 val meta = """META.INF(.)*""".r
 
-mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => {
+assemblyMergeStrategy in assembly := {
   case PathList("org", "apache", "commons", xs@_*) => MergeStrategy.last
   case PathList("com", "esotericsoftware", "minlog", xs@_*) => MergeStrategy.last
   case PathList("com", "google", xs@_*) => MergeStrategy.last
@@ -50,7 +48,6 @@
   case meta(_) => MergeStrategy.discard
   case x => MergeStrategy.first
 }
-}
 
 // super important with multiple tests running spark Contexts
 parallelExecution in Test := false
@@ -69,4 +66,4 @@
   }
 }
 
-resourceGenerators in Compile <+= getTop1MFileFromAlexa
\ No newline at end of file
+resourceGenerators in Compile += getTop1MFileFromAlexa
diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh
index dd00bbc..abe3d06 100755
--- a/spot-ml/ml_ops.sh
+++ b/spot-ml/ml_ops.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/spot-ml/ml_test.sh b/spot-ml/ml_test.sh
index 3036c93..7a4971a 100755
--- a/spot-ml/ml_test.sh
+++ b/spot-ml/ml_test.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
@@ -80,4 +80,4 @@
   --ldabeta ${LDA_BETA} \
   --ldaoptimizer ${LDA_OPTIMIZER} \
   --precision ${PRECISION} \
-  $USER_DOMAIN_CMD
\ No newline at end of file
+  $USER_DOMAIN_CMD
diff --git a/spot-ml/project/plugins.sbt b/spot-ml/project/plugins.sbt
index 55de9d9..30d2f56 100644
--- a/spot-ml/project/plugins.sbt
+++ b/spot-ml/project/plugins.sbt
@@ -15,4 +15,4 @@
  * limitations under the License.
  */
 
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index acf8fc6..dd8cb36 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -31,7 +31,7 @@
 
 /**
   * The suspicious connections analysis of DNS log data develops a probabilistic model the DNS queries
-  * made by each client IP and flags those assigned a low probability as "suspicious"
+  * made by each client IP and flags those assigned a low probability as "suspicious".
   */
 
 object DNSSuspiciousConnectsAnalysis {
@@ -106,9 +106,11 @@
 
 
   /**
-    *
+    * Return all DNS records with valid values.
+    * 
     * @param inputDNSRecords raw DNS records.
     * @return
+    * @see filterInvalidRecords(DataFrame)
     */
   def filterRecords(inputDNSRecords: DataFrame): DataFrame = {
 
@@ -135,9 +137,11 @@
   }
 
   /**
+    * Return all DNS records with invalid values.
     *
     * @param inputDNSRecords raw DNS records.
     * @return
+    * @see filterRecords(DataFrame)
     */
   def filterInvalidRecords(inputDNSRecords: DataFrame): DataFrame = {
 
@@ -164,6 +168,7 @@
   }
 
   /**
+    * Get all DNS records which score below the given threshold.
     *
     * @param scoredDNSRecords scored DNS records.
     * @param threshold        score tolerance.
diff --git a/spot-oa/api/resources/configurator.py b/spot-oa/api/resources/configurator.py
index 5bda045..f8defc6 100644
--- a/spot-oa/api/resources/configurator.py
+++ b/spot-oa/api/resources/configurator.py
@@ -14,35 +14,77 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import ConfigParser
-import os
+from io import open
+
 
 def configuration():
 
-    conf_file = "/etc/spot.conf"
     config = ConfigParser.ConfigParser()
-    config.readfp(SecHead(open(conf_file)))
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
     return config
 
+
 def db():
-    conf = configuration()
-    return conf.get('conf', 'DBNAME').replace("'","").replace('"','')
+    return get_conf('DBNAME')
+
 
 def impala():
-    conf = configuration()
-    return conf.get('conf', 'IMPALA_DEM'),conf.get('conf', 'IMPALA_PORT')
+    return get_conf('IMPALA_DEM'), get_conf('IMPALA_PORT')
+
 
 def hdfs():
-    conf = configuration()
-    name_node = conf.get('conf',"NAME_NODE")
-    web_port = conf.get('conf',"WEB_PORT")
-    hdfs_user = conf.get('conf',"HUSER")
-    hdfs_user = hdfs_user.split("/")[-1].replace("'","").replace('"','')
-    return name_node,web_port,hdfs_user
+    return get_conf('NAME_NODE'), get_conf('WEB_PORT'), get_conf('HUSER').split("/")[-1]
+
 
 def spot():
+    return get_conf('HUSER')
+
+
+def kerberos_enabled():
+    enabled = get_conf('KERBEROS')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    if kerberos_enabled():
+        return get_conf('PRINCIPAL'), get_conf('KEYTAB'), get_conf('SASL_MECH'), get_conf('SECURITY_PROTO')
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    enabled = get_conf('SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    if ssl_enabled():
+        return get_conf('SSL_VERIFY'), get_conf('CA_LOCATION'), get_conf('CERT'), get_conf('KEY')
+    else:
+        raise KeyError
+
+
+def get_conf(key):
     conf = configuration()
-    return conf.get('conf',"HUSER").replace("'","").replace('"','')
+    header = 'conf'
+    result = conf.get(header, key)
+    return result.replace("'", "").replace('"', '').encode('ascii', 'ignore')
+
 
 class SecHead(object):
     def __init__(self, fp):
diff --git a/spot-oa/api/resources/hdfs_client.py b/spot-oa/api/resources/hdfs_client.py
index 31c5eba..e7f6bec 100644
--- a/spot-oa/api/resources/hdfs_client.py
+++ b/spot-oa/api/resources/hdfs_client.py
@@ -14,63 +14,216 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from hdfs import InsecureClient
+
 from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
 from json import dump
-import api.resources.configurator as Config
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
 
 
-def _get_client(user=None):
-    hdfs_nm,hdfs_port,hdfs_user = Config.hdfs()
-    client = InsecureClient('http://{0}:{1}'.format(hdfs_nm,hdfs_port), user= user if user else hdfs_user)
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)}
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
+
     return client
 
-def get_file(hdfs_file):
-    client = _get_client()
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
     with client.read(hdfs_file) as reader:
         results = reader.read()
         return results
 
-def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if client is not None:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
             for item in hdfs_file_content:
                 data = ','.join(str(d) for d in item)
                 writer.write("{0}\n".format(data))
         return True
-        
+
     except HdfsError:
         return False
 
-def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
-	        dump(hdfs_file_content, writer)
+            dump(hdfs_file_content, writer)
         return True
     except HdfsError:
         return False
-    
 
-def delete_folder(hdfs_file,user=None):
-    client = _get_client(user)
-    client.delete(hdfs_file,recursive=True)
 
-def list_dir(hdfs_path):
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
         return client.list(hdfs_path)
     except HdfsError:
         return {}
 
-def file_exists(hdfs_path,file_name):
-    files = list_dir(hdfs_path)
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(client, hdfs_path)
     if str(file_name) in files:
-	    return True
+        return True
     else:
         return False
diff --git a/spot-oa/api/resources/impala_engine.py b/spot-oa/api/resources/impala_engine.py
index b7d0148..542bbd0 100644
--- a/spot-oa/api/resources/impala_engine.py
+++ b/spot-oa/api/resources/impala_engine.py
@@ -15,15 +15,33 @@
 # limitations under the License.
 #
 from impala.dbapi import connect
-import api.resources.configurator as Config
+import api.resources.configurator as config
+
 
 def create_connection():
 
-    impala_host, impala_port =  Config.impala()
-    db = Config.db()
-    conn = connect(host=impala_host, port=int(impala_port),database=db)
+    impala_host, impala_port = config.impala()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    service_name = {'kerberos_service_name': 'impala'}
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        conf.update({'ca_cert': cert,
+                     'use_ssl': ssl_verify
+                     })
+
+    db = config.db()
+    conn = connect(host=impala_host, port=int(impala_port), database=db, **conf)
     return conn.cursor()
 
+
 def execute_query(query,fetch=False):
 
     impala_cursor = create_connection()
@@ -31,6 +49,7 @@
 
     return impala_cursor if not fetch else impala_cursor.fetchall()
 
+
 def execute_query_as_list(query):
 
     query_results = execute_query(query)
@@ -46,5 +65,3 @@
         row_result = {}
 
     return results
-
-
diff --git a/spot-oa/kerberos-requirements.txt b/spot-oa/kerberos-requirements.txt
new file mode 100644
index 0000000..ee4cae4
--- /dev/null
+++ b/spot-oa/kerberos-requirements.txt
@@ -0,0 +1,3 @@
+thrift_sasl==0.2.1
+sasl
+requests-kerberos
\ No newline at end of file
diff --git a/spot-oa/oa/components/data/hive.py b/spot-oa/oa/components/data/hive.py
index a7c1d4b..7d2eaa2 100644
--- a/spot-oa/oa/components/data/hive.py
+++ b/spot-oa/oa/components/data/hive.py
@@ -24,6 +24,7 @@
         self._pipeline = pipeline
 
     def query(self,query,output_file=None, delimiter=','):
+        # TODO: fix kerberos compatibility, use impyla
         hive_config = "set mapred.max.split.size=1073741824;set hive.exec.reducers.max=10;set hive.cli.print.header=true;"
         
         del_format = "| sed 's/[\t]/{0}/g'".format(delimiter)
diff --git a/spot-oa/oa/components/data/impala.py b/spot-oa/oa/components/data/impala.py
index bfc1c5a..10d1f5b 100644
--- a/spot-oa/oa/components/data/impala.py
+++ b/spot-oa/oa/components/data/impala.py
@@ -16,6 +16,8 @@
 #
 
 from subprocess import check_output
+from common import configurator
+
 
 class Engine(object):
 
@@ -24,17 +26,32 @@
         self._daemon_node = conf['impala_daemon']
         self._db = db
         self._pipeline = pipeline
-        impala_cmd = "impala-shell -i {0} --quiet -q 'INVALIDATE METADATA {1}.{2}'".format(self._daemon_node,self._db, self._pipeline)
+
+        if configurator.kerberos_enabled():
+            self._impala_shell = "impala-shell -k -i {0} --quiet".format(self._daemon_node)
+        else:
+            self._impala_shell = "impala-shell -i {0} --quiet".format(self._daemon_node)
+
+        impala_cmd = "{0} -q 'INVALIDATE METADATA {1}.{2}'".format(self._impala_shell, self._db, self._pipeline)
         check_output(impala_cmd,shell=True)
     
-        impala_cmd = "impala-shell -i {0} --quiet -q 'REFRESH {1}.{2}'".format(self._daemon_node,self._db, self._pipeline)
+        impala_cmd = "{0} -q 'REFRESH {1}.{2}'".format(self._impala_shell, self._db, self._pipeline)
         check_output(impala_cmd,shell=True)
 
     def query(self,query,output_file=None,delimiter=","):
 
         if output_file:
-            impala_cmd = "impala-shell -i {0} --quiet --print_header -B --output_delimiter='{1}' -q \"{2}\" -o {3}".format(self._daemon_node,delimiter,query,output_file)
+            impala_cmd = "{0} --print_header -B --output_delimiter='{1}' -q \"{2}\" -o {3}".format(
+                self._impala_shell,
+                delimiter,
+                query,
+                output_file
+            )
         else:
-            impala_cmd = "impala-shell -i {0} --quiet --print_header -B --output_delimiter='{1}' -q \"{2}\"".format(self._daemon_node,delimiter,query)
+            impala_cmd = "{0} --print_header -B --output_delimiter='{1}' -q \"{2}\"".format(
+                self._impala_shell,
+                delimiter,
+                query
+            )
 
         check_output(impala_cmd,shell=True)
diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py
index 5982e8b..5023d7f 100644
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@ -232,7 +232,7 @@
                     rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
 
                 if rep_results:
-                    self._dns_scores = [ conn + [ rep_results[conn[key]] ]   for conn in self._dns_scores  ]
+                    self._dns_scores = [ conn + [ rep_results.get(key) ]    for conn in self._dns_scores  ]
                 else:
                     self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
         else:
@@ -418,4 +418,4 @@
             query_to_insert=("""
                 INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}, d={3}) VALUES {4};
             """).format(self._db, yr, mn, dy, tuple(df_final))
-            impala.execute_query(query_to_insert)
\ No newline at end of file
+            impala.execute_query(query_to_insert)
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 53cec6b..000d9d0 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -34,7 +34,6 @@
 from utils import Util, ProgressBar
 from components.data.data import Data
 from components.geoloc.geoloc import GeoLocalization
-from components.reputation.gti import gti
 from impala.util import as_pandas
 import time
 
@@ -267,37 +266,49 @@
         # read configuration.
         self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
         rep_conf = json.loads(open(reputation_conf_file).read())
- 
-        if "gti" in rep_conf and os.path.isfile(rep_conf['gti']['refclient']):
-            rep_conf = rep_conf['gti']
-            # initialize gti module.
-            self._logger.info("Initializing GTI component")
-            flow_gti = gti.Reputation(rep_conf,self._logger)
 
-            # get all src ips.
+        # initialize reputation services.
+        self._rep_services = []
+        self._logger.info("Initializing reputation services.")
+        for service in rep_conf:
+             config = rep_conf[service]
+             module = __import__("components.reputation.{0}.{0}".format(service), fromlist=['Reputation'])
+             self._rep_services.append(module.Reputation(config,self._logger))
+
+	if self._rep_services :
+ 
+           # get all src ips.
             src_ip_index = self._conf["flow_score_fields"]["srcIP"]
             dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
 
-            self._logger.info("Getting GTI reputation for src IPs")
             flow_scores_src = iter(self._flow_scores)
 
             # getting reputation for src IPs
             src_ips = [ conn[src_ip_index] for conn in flow_scores_src ]            
-            src_rep_results = flow_gti.check(src_ips)
+	    self._logger.info("Getting reputation for each service in config")
+            src_rep_results = {}
+	    for rep_service in self._rep_services:
+                # if more than one reputation service is defined, the last ip match remains after merge
+                # Example fb: returns an entry for every ip, including unknown ones
+                # which overwrites other services that have previously returned a match. Same for dstip
+                # In future should consider a weigted merge, or UX should support multiple reps per IP
+	        src_rep_results = dict(rep_service.check(src_ips).items() + src_rep_results.items())
 
-            self._logger.info("Getting GTI reputation for dst IPs")
             flow_scores_dst = iter(self._flow_scores)
 
             # getting reputation for dst IPs            
             dst_ips = [  conn[dst_ip_index] for conn in flow_scores_dst ]
-            dst_rep_results = flow_gti.check(dst_ips)
+            dst_rep_results = {}
+	    for rep_service in self._rep_services:
+                dst_rep_results = dict(rep_service.check(dst_ips).items() + dst_rep_results.items()) 
 
+	    
             flow_scores_final = iter(self._flow_scores)
 
             self._flow_scores = []
             flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ]
             self._flow_scores = flow_scores           
-            
+
         else:
             # add values to gtiSrcRep and gtiDstRep.
             flow_scores = iter(self._flow_scores)
@@ -460,9 +471,3 @@
                 
         else:
             self._logger.info("No data found for the ingest summary")
-
-
-
- 
-
-        
diff --git a/spot-oa/oa/utils.py b/spot-oa/oa/utils.py
index 2bed10e..8ac6555 100644
--- a/spot-oa/oa/utils.py
+++ b/spot-oa/oa/utils.py
@@ -22,115 +22,114 @@
 import sys
 import ConfigParser
 
+
 class Util(object):
-	
-	@classmethod
-	def get_logger(cls,logger_name,create_file=False):
-		
+    @classmethod
+    def get_logger(cls, logger_name, create_file=False):
 
-		# create logger for prd_ci
-		log = logging.getLogger(logger_name)
-		log.setLevel(level=logging.INFO)
-		
-		# create formatter and add it to the handlers
-		formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-		
-		if create_file:
-				# create file handler for logger.
-				fh = logging.FileHandler('oa.log')
-				fh.setLevel(level=logging.DEBUG)
-				fh.setFormatter(formatter)
-		# reate console handler for logger.
-		ch = logging.StreamHandler()
-		ch.setLevel(level=logging.DEBUG)
-		ch.setFormatter(formatter)
+        # create logger for prd_ci
+        log = logging.getLogger(logger_name)
+        log.setLevel(level=logging.INFO)
 
-		# add handlers to logger.
-		if create_file:
-			log.addHandler(fh)
+        # create formatter and add it to the handlers
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
-		log.addHandler(ch)
-		return  log
+        if create_file:
+            # create file handler for logger.
+            fh = logging.FileHandler('oa.log')
+            fh.setLevel(level=logging.DEBUG)
+            fh.setFormatter(formatter)
+        # reate console handler for logger.
+        ch = logging.StreamHandler()
+        ch.setLevel(level=logging.DEBUG)
+        ch.setFormatter(formatter)
 
-	@classmethod
-	def get_spot_conf(cls):
-		
-		conf_file = "/etc/spot.conf"
-		config = ConfigParser.ConfigParser()
-		config.readfp(SecHead(open(conf_file)))	
+        # add handlers to logger.
+        if create_file:
+            log.addHandler(fh)
 
-		return config
-	
-	@classmethod
-	def create_oa_folders(cls,type,date):		
+        log.addHandler(ch)
+        return log
 
-		# create date and ingest summary folder structure if they don't' exist.
-		root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-		data_type_folder = "{0}/data/{1}/{2}"
-		if not os.path.isdir(data_type_folder.format(root_path,type,date)): os.makedirs(data_type_folder.format(root_path,type,date))
-		if not os.path.isdir(data_type_folder.format(root_path,type,"ingest_summary")): os.makedirs(data_type_folder.format(root_path,type,"ingest_summary"))
+    @classmethod
+    def get_spot_conf(cls):
 
-		# create ipynb folders.
-		ipynb_folder = "{0}/ipynb/{1}/{2}".format(root_path,type,date)
-		if not os.path.isdir(ipynb_folder): os.makedirs(ipynb_folder)
+        conf_file = "/etc/spot.conf"
+        config = ConfigParser.ConfigParser()
+        config.readfp(SecHead(open(conf_file)))
 
-		# retun path to folders.
-		data_path = data_type_folder.format(root_path,type,date)
-		ingest_path = data_type_folder.format(root_path,type,"ingest_summary")		
-		return data_path,ingest_path,ipynb_folder
-	
-	@classmethod
-	def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path):
+        return config
 
-		# get results from hdfs.
-		get_results_cmd = "hadoop fs -get {0} {1}/.".format(hdfs_file_path,local_path)
-		subprocess.call(get_results_cmd,shell=True)
-		return get_results_cmd
+    @classmethod
+    def create_oa_folders(cls, type, date):
 
-	@classmethod
-	def read_results(cls,file,limit, delimiter=','):
-		
-		# read csv results.
-		result_rows = []
-		with open(file, 'rb') as results_file:
-			csv_reader = csv.reader(results_file, delimiter = delimiter)
-			for i in range(0, int(limit)):
-				try:
-					row = csv_reader.next()
-				except StopIteration:
-					return result_rows
-				result_rows.append(row)
-		return result_rows
+        # create date and ingest summary folder structure if they don't' exist.
+        root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+        data_type_folder = "{0}/data/{1}/{2}"
+        if not os.path.isdir(data_type_folder.format(root_path, type, date)): os.makedirs(
+            data_type_folder.format(root_path, type, date))
+        if not os.path.isdir(data_type_folder.format(root_path, type, "ingest_summary")): os.makedirs(
+            data_type_folder.format(root_path, type, "ingest_summary"))
 
-	@classmethod
-	def ip_to_int(self,ip):
-		
-		try:
-			o = map(int, ip.split('.'))
-			res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3]
-			return res    
+        # create ipynb folders.
+        ipynb_folder = "{0}/ipynb/{1}/{2}".format(root_path, type, date)
+        if not os.path.isdir(ipynb_folder): os.makedirs(ipynb_folder)
 
-		except ValueError:
-			return None
-	
-	
-	@classmethod
-	def create_csv_file(cls,full_path_file,content,delimiter=','):   
-		with open(full_path_file, 'w+') as u_file:
-			writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter)
-			writer.writerows(content)
+        # retun path to folders.
+        data_path = data_type_folder.format(root_path, type, date)
+        ingest_path = data_type_folder.format(root_path, type, "ingest_summary")
+        return data_path, ingest_path, ipynb_folder
 
+    @classmethod
+    def get_ml_results_form_hdfs(cls, hdfs_file_path, local_path):
 
-	@classmethod
-    	def cast_val(self,value):
-       	    try: 
-            	val = int(value) 
+        # get results from hdfs.
+        get_results_cmd = "hadoop fs -get {0} {1}/.".format(hdfs_file_path, local_path)
+        subprocess.call(get_results_cmd, shell=True)
+        return get_results_cmd
+
+    @classmethod
+    def read_results(cls, file, limit, delimiter=','):
+
+        # read csv results.
+        result_rows = []
+        with open(file, 'rb') as results_file:
+            csv_reader = csv.reader(results_file, delimiter=delimiter)
+            for i in range(0, int(limit)):
+                try:
+                    row = csv_reader.next()
+                except StopIteration:
+                    return result_rows
+                result_rows.append(row)
+        return result_rows
+
+    @classmethod
+    def ip_to_int(self, ip):
+
+        try:
+            o = map(int, ip.split('.'))
+            res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3]
+            return res
+
+        except ValueError:
+            return None
+
+    @classmethod
+    def create_csv_file(cls, full_path_file, content, delimiter=','):
+        with open(full_path_file, 'w+') as u_file:
+            writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter)
+            writer.writerows(content)
+
+    @classmethod
+    def cast_val(self, value):
+        try:
+            val = int(value)
+        except:
+            try:
+                val = float(value)
             except:
-            	try:
-                    val = float(value) 
-            	except:
-                    val = str(value) 
-            return val    
+                val = str(value)
+        return val
 
 
 class SecHead(object):
@@ -140,47 +139,38 @@
 
     def readline(self):
         if self.sechead:
-            try: 
+            try:
                 return self.sechead
-            finally: 
+            finally:
                 self.sechead = None
-        else: 
+        else:
             return self.fp.readline()
 
+
 class ProgressBar(object):
+    def __init__(self, total, prefix='', sufix='', decimals=2, barlength=60):
+        self._total = total
+        self._prefix = prefix
+        self._sufix = sufix
+        self._decimals = decimals
+        self._bar_length = barlength
+        self._auto_iteration_status = 0
 
-	def __init__(self,total,prefix='',sufix='',decimals=2,barlength=60):
+    def start(self):
+        self._move_progress_bar(0)
 
-		self._total = total
-		self._prefix = prefix
-		self._sufix = sufix
-		self._decimals = decimals
-		self._bar_length = barlength
-		self._auto_iteration_status = 0
+    def update(self, iterator):
+        self._move_progress_bar(iterator)
 
-	def start(self):
+    def auto_update(self):
+        self._auto_iteration_status += 1
+        self._move_progress_bar(self._auto_iteration_status)
 
-		self._move_progress_bar(0)
-	
-	def update(self,iterator):
-		
-		self._move_progress_bar(iterator)
+    def _move_progress_bar(self, iteration):
+        filledLength = int(round(self._bar_length * iteration / float(self._total)))
+        percents = round(100.00 * (iteration / float(self._total)), self._decimals)
+        bar = '#' * filledLength + '-' * (self._bar_length - filledLength)
+        sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix))
+        sys.stdout.flush()
 
-	def auto_update(self):
-
-		self._auto_iteration_status += 1		
-		self._move_progress_bar(self._auto_iteration_status)
-	
-	def _move_progress_bar(self,iteration):
-
-		filledLength    = int(round(self._bar_length * iteration / float(self._total)))
-		percents        = round(100.00 * (iteration / float(self._total)), self._decimals)
-		bar             = '#' * filledLength + '-' * (self._bar_length - filledLength)	
-		sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix))		
-		sys.stdout.flush()
-		
-		if iteration == self._total:print("\n")
-
-		
-	
-
+        if iteration == self._total: print("\n")
diff --git a/spot-oa/requirements.txt b/spot-oa/requirements.txt
index 9f3afb8..2596e64 100644
--- a/spot-oa/requirements.txt
+++ b/spot-oa/requirements.txt
@@ -16,7 +16,7 @@
 # GraphQL API dependencies
 flask
 flask-graphql
-graphql-core
+graphql-core == 1.1.0
 urllib3
 
 # API Resources
@@ -24,3 +24,5 @@
 thrift==0.9.3
 impyla
 hdfs
+requests
+
diff --git a/spot-oa/runIpython.sh b/spot-oa/runIpython.sh
index 38a4121..26eaeff 100755
--- a/spot-oa/runIpython.sh
+++ b/spot-oa/runIpython.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
diff --git a/spot-setup/APACHE-SPOT-SCHEMA.md b/spot-setup/APACHE-SPOT-SCHEMA.md
new file mode 100644
index 0000000..f6ba142
--- /dev/null
+++ b/spot-setup/APACHE-SPOT-SCHEMA.md
@@ -0,0 +1,436 @@
+# Apache Spot Schema 
+
+This document is to centralize a place where users can read information about Proxy, DNS and flow schema. From this document users with their own ingest can implement a module without using spot-ingest, or they can compare them. User can do that creating a data set with the expected columns by pipeline.
+
+- [Proxy](#proxy)
+
+    Attributes and rules.
+    * [Proxy Schema for spot-ingest](#proxy-schema-for-spot-ingest) 
+    * [Proxy Schema for spot-ml](#proxy-schema-for-spot-ml)
+    * [Proxy Schema for spot-oa](#proxy-schema-for-spot-oa)
+    * [Proxy Schema for spot-ui](#proxy-schema-for-spot-ui)
+
+- [Flow (spot-nfdump)](#flow-spot-nfdump)
+
+    Attributes and rules.
+    * [Flow Schema for spot-ingest](#Flow-schema-for-spot-ingest) 
+    * [Flow Schema for spot-ml](#flow-schema-for-spot-ml)
+    * [Flow Schema for spot-oa](#flow-schema-for-spot-oa)
+    * [Flow Schema for spot-ui](#flow-schema-for-spot-ui)
+
+- [DNS](#dns)
+
+    Attributes and rules.
+    * [DNS Schema for spot-ingest](#dns-schema-for-spot-ingest) 
+    * [DNS Schema for spot-ml](#dns-schema-for-spot-ml)
+    * [DNS Schema for spot-oa](#dns-schema-for-spot-oa)
+    * [DNS Schema for spot-ui](#dns-schema-for-spot-ui)
+
+## Proxy
+The table shows the list of attributes used in proxy. The columns indicated with field (:white_check_mark:) are used by the pipeline.  
+
+|Spot Field Name  |Type    |Description                                  |Original Field Name  |Format      |Spot-ingest       |Spot-ml           |Spot-oa           |Spot-ui           | 
+|---------------- |--------|---------------------------------------------|---------------------|------------|------------------|------------------|------------------|------------------|
+| p_date          | string | Date for the connection                     |        date         | yyyy-mm-dd |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| p_time	      | string | Time for the connection	                 |        time	       |  hh:MM:SS  |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| clientip        | string |IP address of the client sending the request |        c-ip	       | ip address	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| host        	  | string |Hostname from the client's request URL	     |       cs-host	   |    text	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| reqmethod	      | string |Request method used from client to appliance (HTTP Method - GET, POST, CONNECT) |	cs-method | 	text |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| useragent	      | string |Browser Type	                             | cs(User-Agent)	   |quoted text	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| resconttype	  | string |Content-type (Ex. text/html, image/xml)	     |rs(Content-Type) 	   | text	    |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| duration	      |  int   |Duration of the connection	                 |time-taken	       |numerical	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| username	      |string  |Client Username	                             |cs-username	       |text	    |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| authgroup   	  |string  |Client Authentication Group	                 |cs-auth-group 	   |text	    |:white_check_mark:|	-	    |     -    |       -  |
+| exceptionid	  |string  |Identifier of the exception resolved (empty if the transaction has not been terminated) |	x-exception-id 	| text	|:white_check_mark:|- | - |     -         |	
+| filterresult    |string  |Content filtering result: Denied, Proxied or Observed | sc-filter-result | text |:white_check_mark:|     -        |        -        |      -   |			
+| webcat	      |string  |All content categories of the request URL	 |cs-categories        |quoted text	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| referer	      |string  |Request header: Referer %S s-sitename The service type used to | cs(Referer) | url |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| respcode	      |string  |Protocol status code from appliance to client (HTTP Response Codes) | sc-status | numerical |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| action	      |string  |What type of action did the Appliance take to process this request; possible values include ALLOWED, DENIED, FAILED, SERVER_ERROR|s-action |text |:white_check_mark:| -| -|- | 			
+| urischeme	      |string  |Scheme of the original URL requested	     |cs-uri-scheme 	   |text	    |:white_check_mark:|      -       |     -           |      -          |		
+| uriport	      |string  |Port from the original URL requested	     |cs-uri-port 	       |numerical	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| uripath	      |string  |Path of the original URL requested without query |cs-uri-path 	   |text	    |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| uriquery	      |string  |Query from the original URL requested	     |cs-uri-query	       |text	    |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| uriextension	  |string  |Document extension from the original URL requested |cs-uri-extension |text	    |:white_check_mark:|      -       |     -           |      -          |		
+| serverip	      |string  |IP address of the appliance on which the client established its connection |s-ip  |ip address |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| scbytes	      |int	   |Number of bytes sent from appliance to client|sc-bytes             |numerical	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| csbytes	      |int	   |Number of bytes sent from client to appliance|cs-bytes 	           |numerical	|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| virusid	      |string  |x-virus-id 	                                 |x-virus-id 	       |text	    |:white_check_mark:|    -         |    -            |       -         |		
+| bcappname	      |string  |x-bluecoat-application-name 	             |x-bluecoat-application-name |quoted text |:white_check_mark:|  -        |  -              |    -            |			
+| bcappoper	      |string  |x-bluecoat-application-operation	         |x-bluecoat-application-operation |quoted text |:white_check_mark:|-     |   -             |    -            |			
+|fulluri	      |string  |Full URI concatenated from cs-host, cs-uri-path, cs-uri-query fields |it does not exist, it is calculated during ingest |text |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| word 	          |string  |      -                					     |           -          |   -       |     -            |  -               |:white_check_mark:|      -          |
+| ml_score	      |float   |				-	                         |          -           |      -    |        -         |   -              |:white_check_mark:|  -              |
+| respcode_name   |string  |IANA translation for the response code column|                -     |     -     |      -           |       -          |:white_check_mark:|:white_check_mark:|
+| uri_rep	      |string  |Reputation value according to Threat intelligence services| 	-   |	-		|        -         |  -               |:white_check_mark:|:white_check_mark:|
+| network_context |string  |User defined value					         |              -       |     -     |       -          |     -            |:white_check_mark:|:white_check_mark:| 
+
+
+## Flow (spot-nfdump) 
+The table shows the list of attributes used in flow. The columns indicated with field (:white_check_mark:) are used by the pipeline.  
+
+|Spot Field Name  |Type    |Description                                  |Original NFDUMP Field Name           |Format                   |Spot-ingest       |Spot-ml           |Spot-oa           |Spot-ui           | 
+|---------------- |--------|---------------------------------------------|-------------------------------------|-------------------------|------------------|------------------|------------------|------------------|
+| treceived  	  | string | Time the flow was received by the collector | tr	                               |YYYY-mm-DD HH:MM:SS      |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| unix_tstamp	  | bigint | treceived epoch time	                     |it is calculated by ingest hql script|number (1471431305)      |:white_check_mark:| -                | -                | -                |                     
+| tryear     	  | int    | treceived year 	                         |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| trmonth    	  | int    | treceived month	                         |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| trday      	  | int    | treceived day	                             |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| trhour     	  | int    | treceived hour	                             |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| trminute   	  | int    | treceived minute	                         |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| trsec      	  | int    | treceived seconds	                         |it is calculated by spot-nfdump	   |numerical                |:white_check_mark:|:white_check_mark:| -                | -                |
+| tdur       	  | float  | Duration	                                 | td	                               |xx.xx (18.04400062561035)|:white_check_mark:|:white_check_mark:| -                | -                |
+| sip        	  | string | Source IP Address          	             | sa	                               |ip address dotted decimal|:white_check_mark:|:white_check_mark:| -                | -                |
+| dip        	  | string | Destination IP Address	                     | da	                               |ip address dotted decimal|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| sport      	  | int    | Source Port	                             | sap	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| dport      	  | int    | Destination Port	                         | dap	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| proto      	  | string | Protocol	                                 | pr	                               |text (UDP, TCP, etc)     |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| flag       	  | string | TCP Flags	                                 | flg	                               |dotted flag representation (.A....)|:white_check_mark:| -      |:white_check_mark:|:white_check_mark:|   
+| fwd        	  | int    | Forwarding Status	                         | fwd	                               |numerical                |:white_check_mark:| -                | -                | -                |
+| stos       	  | int    | Source Tos (DSCP)	                         | stos	                               |numerical                |:white_check_mark:| -                |:white_check_mark:|:white_check_mark:|
+| ipkt       	  | bigint | Input Packets	                             | ipkt	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| ibyt       	  | bigint | Input Bytes	                             | ibyt	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| opkt       	  | bigint | Output Packets	                             | opkt	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| obyt       	  | bigint | Output Bytes	                             | obyt	                               |numerical                |:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|
+| input      	  | int    | Input interface SNMP number	             | in	                               |numerical                |:white_check_mark:| -                |:white_check_mark:|:white_check_mark:| 
+| output     	  | int    | Output interface SNMP number	             | out	                               |numerical                |:white_check_mark:| -                |:white_check_mark:|:white_check_mark:|
+| sas        	  | int    | Source AS number	                         | sas	                               |numerical                |:white_check_mark:| -                | -                | -                |
+| das        	  | int    | Destination AS number	                     | das	                               |numerical                |:white_check_mark:| -                | -                | -                |
+| dtos       	  | int    | Destination Tos (DSCP)	                     | dtos	                               |numerical                |:white_check_mark:| -                | -                | -                |  
+| dir        	  | int    | direction	                                 | dir	                               |numerical (0,1)          |:white_check_mark:| -                | -                | -                | 
+| rip        	  | string | Router IP	                                 | ra	                               |ip address dotted decimal|:white_check_mark:| -                |:white_check_mark:|:white_check_mark:|
+| ML_score	      |float   | Score assigned by ML - Produced by ML		 |                                     |numerical			     |                  |                  |:white_check_mark:|                  |	
+| rank	          | int    | Rank number based on the order of ML_score values - Produced by OA |		       |numerical				 |                  | -                | -                |:white_check_mark:|
+| srcip_internal  |int     | Boolean value to identify an internal source IP - Produced by OA |                |                         | -                | -                | -                |:white_check_mark:|
+| dstip_internal  |int     | Boolean value to identify an internal destination IP - Produced by OA |		   |                         | -                | -                | -			      |:white_check_mark:|
+| src_geoloc	  |string  | Lat & Long values of the source IP - Produced by OA |						       |                         | -                | -                | -                |:white_check_mark:|
+| dst_geoloc	  |string  |Lat & Long values of the destination IP - Produced by OA |						   |                         | -                | -                | -                |:white_check_mark:|
+| src_domain	  |string  |Domain assigned to the source IP - Produced by OA |                                |						 | -                | -                | -                |:white_check_mark:|
+| dst_domain	  |string  |Domain assigned to the destination IP - Produced by OA |						   |                         | -                | -                | -                |:white_check_mark:|
+| src_rep	      |string  |Collection of reputation values assigned to the source IP from different TI services - Produced by OA | |    | -                | -				   | -                |:white_check_mark:|
+| dst_rep	      |string  |Collection of reputation values  assigned to the destination IP from different TI services - Produced by OA||| -                | -                | -         		  |:white_check_mark:|
+
+
+## DNS 
+The table shows the list of attributes used in DNS. The columns indicated with field (:white_check_mark:) are used by the pipeline.  
+
+|Spot Field Name  |Type    |Description                                  |Original NFDUMP Field Name           |Format                   |Spot-ingest       |Spot-ml           |Spot-oa           |Spot-ui           | 
+|---------------- |--------|---------------------------------------------|-------------------------------------|-------------------------|------------------|------------------|------------------|------------------|
+| frame_time   	  |string  |Tshark Frame Time received	                 |frame.time	          |Ex. Jan  4 2017 04:41:06.337519000 UTC|:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| unix_tstamp  	  |bigint  |Tshark Frame Time received epoch format      |frame.time_epoch	                   |numerical (1483504866)	 |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |	
+| frame_len    	  |int     |Tshark Frame Length	                         |frame.len	                           |numerical	             |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| ip_dst       	  |string  |Tshark IP destination (Client IP)	         |ip.dst	                           |ip address dotted decimal|:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| ip_src       	  |string  |Tshark IP source (DNS Server IP)	         |ip.src	                           |ip address dotted decimal|-                 | -                |:white_check_mark:| -                |  				
+| dns_qry_name 	  |string  |Tshark DNS Query Name	                     |dns.qry.name	                       |text	                 |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| dns_qry_class	  |string  |Tshark DNS Query Class	                     |dns.qry.class	                       |hexadecimal (0x00000001) |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| dns_qry_type 	  |int     |Tshark DNS Query Type	                     |dns.qry.type	                       |numerical	             |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |		
+| dns_qry_rcode	  |int     |Tshark DNS Query Response Code	             |dns.flags.rcode	                   |numerical	             |:white_check_mark:|:white_check_mark:|:white_check_mark:| -                |	
+| dns_a        	  |string  |Tshark DNS Query A Record	                 |dns.a	text				           |                         | -                | -                | -                |:white_check_mark:|
+|ML_score	      |float   |Produced by ML                               |                                     |                         | -                | -                |:white_check_mark:|:white_check_mark:| 
+|tld	          |string  |Top level domain obtained from query name column - Produced by OA |                |                         | -                | -                | -                |:white_check_mark:|
+|query_rep	      |string  |Collection of reputation values assigned to the destination IP from different TI services - Produced by OA|| | -                | -                | -                |:white_check_mark:|
+|hh	              |int     |Obtained from frame time column - Produced by OA |                                 |                         | -                | -                | -                |:white_check_mark:|
+|dns_qry_class_name|string |Translation for the query class code - Produced by OA |                            |                         | -                | -                | -                |:white_check_mark:|
+|dns_qry_type_name|string  |Translation for the query type code - Produced by OA |                             |                         | -                | -                | -                |:white_check_mark:|
+|dns_qry_rcode_name|string |Translation for the query response code - Produced by OA |                         |                         | -                | -                | -                |:white_check_mark:|
+|network_context  |string  |Value to identify the destination IP as internal to the network - Produced by OA | |                         | -                | -                | -                |:white_check_mark:|
+
+
+### Proxy Schema for spot-ingest
+The table shows proxy schema attributes and the rules used specifically for ingest.
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| p_date          | -                   | -                      |
+| p_time          | -                   | -                      |
+| clientip        | -                   | -                      |
+| host            | -                   | -                      |
+| reqmethod       | -                   | -                      |
+| useragent       | -                   | -                      |
+| resconttype     | -                   | -                      |
+| duration        | -                   | -                      |
+| username        | -                   | -                      |
+| authgroup       | -                   | -                      | 
+| exceptionid     | -                   | -                      |
+| filterresult    | -                   | -                      | 
+| webcat          | -                   | -                      |
+| referer         | -                   | -                      |
+| respcode        | -                   | -                      |
+| action          | -                   | -                      |
+| urischeme       | -                   | -                      |
+| uriport         | -                   | -                      |
+| uripath         | -                   | -                      |    
+| uriquery        | -                   | -                      |
+| uriextension    | -                   | -                      |
+| serverip        | -                   | -                      |
+| scbytes         | -                   | -                      |
+| csbytes         | -                   | -                      |
+| virusid         | -                   | -                      | 
+| bcappname       | -                   | -                      |
+| bcappoper       | -                   | -                      |
+| fulluri         | -                   | produced by ingest     |
+
+
+### Proxy Schema for spot-ml
+The table shows proxy schema attributes and the rules used specifically for machine learning (ml).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| p_date          | Can't be null       | -                      |
+| p_time          | Can't be null       | -                      |
+| clientip        | Can't be null       | -                      |
+| host            | Can't be null       | -                      |
+| reqmethod       | -                   | -                      |
+| useragent       | -                   | Null will be replaced with "-" |
+| resconttype     | -                   | Null will be replaced with "-" |
+| duration        | -                   | -                      |
+| username        | -                   | -                      |
+| webcat          | -                   | -                      |
+|referer          | -                   | -                      |
+|respcode         | -                   | -                      |
+|uriport          | -                   | -                      |
+|uripath          | -                   | -                      |    
+|uriquery         | -                   | -                      |
+|serverip         | -                   | -                      |
+|scbytes          | -                   | -                      |
+|csbytes          | -                   | -                      |
+|fulluri          | Can't be null       | -                      |
+
+
+### Proxy Schema for spot-oa
+The table shows proxy schema attributes and the rules used specifically for operation analytics (oa).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| p_date          | -                   | -                      |
+| p_time          | -                   | -                      |
+| clientip        | -                   | -                      |
+| host            | -                   | -                      |
+| reqmethod       | -                   | -                      |
+| useragent       | -                   | -                      |
+| resconttype     | -                   | -                      |
+| duration        | -                   | -                      |
+| username        | -                   | -                      |
+| webcat          | -                   | -                      |
+|referer          | -                   | -                      |
+|respcode         | -                   | -                      |
+|uriport          | -                   | -                      |
+|uripath          | -                   | -                      |    
+|uriquery         | -                   | -                      |
+|serverip         | -                   | -                      |
+|scbytes          | -                   | -                      |
+|csbytes          | -                   | -                      |
+|fulluri          | -                   | -                      |
+| word            | -                   | -                      |
+| ml_score        | -                   | -                      |
+| respcode_name   | -                   | Produced by OA         |
+| uri_rep         | -                   | Produced by OA         |
+| network_context | -                   | Produced by OA         |
+
+
+### Proxy Schema for spot-ui
+The table shows proxy schema attributes and the rules used specifically for user interface (ui).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| p_date          | -                   | -                      |
+| p_time          | -                   | -                      |
+| clientip        | -                   | -                      |
+| host            | -                   | -                      |
+| reqmethod       | -                   | -                      |
+| useragent       | -                   | -                      |
+| resconttype     | -                   | -                      |
+| duration        | -                   | -                      |
+| username        | -                   | -                      |
+| webcat          | -                   | -                      |
+|referer          | -                   | -                      |
+|respcode         | -                   | -                      |
+|uriport          | -                   | -                      |
+|uripath          | -                   | -                      |    
+|uriquery         | -                   | -                      |
+|serverip         | -                   | -                      |
+|scbytes          | -                   | -                      |
+|csbytes          | -                   | -                      |
+|fulluri          | -                   | -                      |
+| respcode_name   | -                   | Optional               |
+| uri_rep         | -                   | Optional               |
+| network_context | -                   | Optional               |
+
+
+### Flow Schema for spot-ingest
+The table shows flow schema attributes and the rules used specifically for ingest.
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| treceived  	  | -	                | -                      |
+| unix_tstamp	  | -	                | produced by ingest     |
+| tryear     	  | -	                | produced by spot-nfdump|
+| trmonth    	  | -	                | produced by spot-nfdump|
+| trday      	  | - 	                | produced by spot-nfdump|
+| trhour     	  | -	                | produced by spot-nfdump|
+| trminute   	  | -	                | produced by spot-nfdump|
+| trsec      	  | -	                | produced by spot-nfdump|
+| tdur       	  | -	                | -                      |
+| sip        	  | -	                | -                      |
+| dip        	  | -	                | -                      |
+| sport      	  | -	                | -                      |
+| dport      	  | -	                | -                      |
+| proto      	  | -	                | -                      |
+| flag       	  | -	                | -                      |
+| fwd        	  | -	                | -                      |
+| stos       	  | -	                | -                      |
+| ipkt       	  | -	                | -                      |
+| ibyt       	  | -	                | -                      |
+| opkt       	  | -	                | -                      |
+| obyt       	  | -	                | -                      |
+| input      	  | -	                | -                      |
+| output     	  | -	                | -                      |
+| sas        	  | -	                | -                      |
+| das        	  | -	                | -                      |
+| dtos       	  | -	                | -                      |  
+| dir        	  | -	                | -                      |
+| rip        	  | -	                | -                      |
+
+
+### Flow Schema for spot-ml
+The table shows flow schema attributes and the rules used specifically for machine learning (ml).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| treceived  	  | Can't be null	    | -                      |
+| tryear     	  |	-                   | -                      |
+| trmonth    	  | -                   | -	                     |
+| trday      	  |	-                   | -                      |
+| trhour     	  | Should be a number between 0 and 23| -	     |
+| trminute   	  | Should be a number between 0 and 59| -	     |
+| trsec      	  | Should be a number between 0 and 59| -	     |
+| tdur       	  | -                   | - 	                 |
+| sip        	  | Can't be null	    | -                      |
+| dip        	  | Can't be null	    | -                      |
+| sport      	  | shlould be grater or equal to 0| - 	         |
+| dport      	  | shlould be grater or equal to 0| -           |
+| proto      	  | -                   | -	                     |
+| ipkt       	  | shlould be grater or equal to 0| -	         |
+| ibyt       	  | shlould be grater or equal to 0| -           |
+| opkt       	  |	-                   | -                      |
+| obyt       	  |	-                   | -                      |
+
+
+### Flow Schema for spot-oa
+The table shows flow schema attributes and the rules used specifically for operation analytics (oa).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| treceived    	  | -	                | -                      |
+| sip             | -	                | -                      |
+| dip             | -	                | -                      |
+| sport           | -	                | -                      |
+| dport           | -	                | -                      |
+| proto           | -	                | -                      |
+| flag            | -	                | -                      |
+| stos            | -	                | -                      |
+| ipkt            | -	                | -                      |
+| ibyt            | -	                | -                      |
+| opkt            | -	                | -                      |
+| obyt            | -	                | -                      |
+| input           | -	                | -                      |
+| output          | -	                | -                      |
+| rip             | -	                | -                      |
+| ML_score        | -	                | -                      |
+
+
+
+### Flow Schema for spot-ui
+The table shows flow schema attributes and the rules used specifically for user interface (ui).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| treceived       | -	                | -                      |
+| sip             | -	                | -                      |
+| dip             | -	                | -                      | 
+| sport           | -	                | -                      |
+| dport           | -	                | -                      |
+| proto           | -	                | -                      |
+| flag            | -	                | -                      |
+| stos            | -	                | -                      |
+| ipkt            | -	                | -                      |
+| ibyt            | -	                | -                      |
+| opkt            | -	                | -                      |
+| obyt            | -	                | -                      |
+| input           | -	                | -                      |
+| output          | -	                | -                      |
+| rip             | -	                | -                      |
+| rank            | -	                | -                      |
+| srcip_internal  | -	                | -                      |
+| dstip_internal  | -	                | -                      |
+| src_geoloc      | -	                | -                      |
+| dst_geoloc      | -	                | -                      |
+| src_domain      | -	                | -                      |
+| dst_domain      | -	                | -                      |
+| src_rep         | -	                | -                      |
+| dst_rep         | -	                | -                      |
+
+
+### DNS Schema for spot-ingest
+The table shows DNS schema attributes and the rules used specifically for ingest.
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| frame_time   	  | -	                | -                      |
+| unix_tstamp  	  | -	                | -                      |
+| frame_len    	  | -	                | -                      | 
+| ip_dst       	  | -	                | -                      |
+| ip_src       	  | -	                | -                      |
+| dns_qry_name 	  | -	                | -                      |
+| dns_qry_class	  | -	                | -                      |
+| dns_qry_type 	  | -	                | -                      |
+| dns_qry_rcode	  | -	                | -                      |
+| dns_a        	  | -	                | -                      |
+
+
+### DNS Schema for spot-ml
+The table shows DNS schema attributes and the rules used specifically for machine learning (ml).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| frame_time   	  | Can't be null, empty string or "-"	                               |-|
+| unix_tstamp  	  | Should be a number equal or greater than 0	                       |-|
+| frame_len    	  | Should be a number equal or greater than 0	                       |-|
+| ip_dst       	  | Can't be null, neither empty string or "-"	                       |-|
+| dns_qry_name 	  | Can't be null, neither empty string or "-"	                       |-|
+| dns_qry_class	  | If dns_qry_type and dns_qry_rcode are null, this one can't be null |-|
+| dns_qry_type 	  | If dns_qry_class and dns_qry_rcode are null, this can't be null	   |-|
+| dns_qry_rcode	  | If dns_qry_class and dns_qry_type are null, this can't be null	   |-|
+
+
+### DNS Schema for spot-oa
+The table shows DNS schema attributes and the rules used specifically for operation analytics (oa).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| frame_time      | -	                | -                      |
+| unix_tstamp     | -	                | -                      |
+| frame_len       | -	                | -                      |
+| ip_dst          | -	                | -                      |
+| ip_src          | -	                | -                      |
+| dns_qry_name    | -	                | -                      |
+| dns_qry_class   | -	                | -                      |
+| dns_qry_type    | -	                | -                      |
+| dns_qry_rcode   | -	                | -                      |
+| ML_score        | -	                | -                      |
+
+
+### DNS Schema for spot-ui
+The table shows DNS schema attributes and the rules used specifically for user interface (ui).
+
+| Spot field name | Rules               | Comments               |
+|-----------------|---------------------|------------------------| 
+| dns_a           | -	                | -                      |
+| ML_score        | -	                | -                      |
+| tld             | -	                | -                      |
+| query_rep       | -	                | -                      |
+| hh              | -	                | -                      |
+| dns_qry_class_name | -                | -                      |
+| dns_qry_type_name| -	                | -                      |
+| dns_qry_rcode_name | -                | -                      |
+| network_context | -	                | -                      |
\ No newline at end of file
diff --git a/spot-setup/KERBEROS.md b/spot-setup/KERBEROS.md
new file mode 100644
index 0000000..a980d1c
--- /dev/null
+++ b/spot-setup/KERBEROS.md
@@ -0,0 +1,50 @@
+## Kerberos support installation
+
+run the following in addition to the typical installation instructions
+
+### Spot-Ingest
+
+`pip install -r ./spot-ingest/kerberos-requirements.txt`
+
+### Spot-OA
+
+`pip install -r ./spot-oa/kerberos-requirements.txt`
+
+
+## spot.conf
+
+KERBEROS       =  set `KERBEROS='true'` in /etc/spot.conf to enable kerberos
+KEYTAB         =  should be generated using `ktutil` or another approved method
+SASL_MECH      =  should be set to `sasl_plaintext` unless using ssl
+KAFKA_SERVICE  =  if not set defaults will be used
+
+SSL            =  enable ssl by setting to true
+SSL_VERIFY     =  by setting to `false` disables host checking **important** only recommended in non production environments
+CA_LOCATION    =  location of certificate authority file
+CERT           =  host certificate
+KEY            =  key required for host certificate
+
+sample below:
+
+```
+#kerberos config
+KERBEROS='true'
+KINIT=/usr/bin/kinit
+PRINCIPAL='spot'
+KEYTAB='/opt/security/spot.keytab'
+SASL_MECH='GSSAPI'
+SECURITY_PROTO='sasl_plaintext'
+KAFKA_SERVICE_NAME=''
+
+#ssl config
+SSL='false'
+SSL_VERIFY='true'
+CA_LOCATION=''
+CERT=''
+KEY=''
+
+```
+
+Please see [LIBRDKAFKA Configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
+for reference to additional settings that can be set by modifying `spot-ingest/common/kafka_client.py`
+
diff --git a/spot-setup/README.md b/spot-setup/README.md
index 1d486a6..c5d245a 100644
--- a/spot-setup/README.md
+++ b/spot-setup/README.md
@@ -21,6 +21,11 @@
 
 The main script in the repository is **hdfs_setup.sh** which is responsible of loading environment variables, creating folders in Hadoop for the different use cases (flow, DNS or Proxy), create the Impala database, and finally execute Impala query scripts that creates Impala tables needed to access netflow, dns and proxy data.
 
+Options:
+--no-sudo     will execute commands as the existing user while setting `HADOOP_USER_NAME=hdfs`
+-c            specify a custom location for the spot.conf, defaults to /etc/spot.conf
+-d            specific which database client to use `-d beeline` NOTE: Impala supports kerberos
+
 ## Environment Variables
 
 **spot.conf** is the file storing the variables needed during the installation process including node assignment, User interface, Machine Learning and Ingest gateway nodes.
@@ -33,6 +38,8 @@
 
 spot-setup contains a script per use case, as of today, there is a table creation script for each DNS, flow and Proxy data.
 
+the HQL scripts are seperated by the underlying database in the ./spot-setup/ folder.
+
 These HQL scripts are intended to be executed as a Impala statement and must comply HQL standards.
 
 We create tables using Parquet format to get a faster query performance. This format is an industry standard and you can find more information about it on:
diff --git a/spot-setup/create_dns_parquet.hql b/spot-setup/beeline/create_dns_parquet.hql
similarity index 74%
copy from spot-setup/create_dns_parquet.hql
copy to spot-setup/beeline/create_dns_parquet.hql
index 38025c6..b9be108 100755
--- a/spot-setup/create_dns_parquet.hql
+++ b/spot-setup/beeline/create_dns_parquet.hql
@@ -14,8 +14,7 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
-
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns (
 frame_time STRING, 
 unix_tstamp BIGINT,
 frame_len INT,
@@ -34,10 +33,10 @@
 h TINYINT
 )
 STORED AS PARQUET 
-LOCATION '${var:huser}/dns/hive';
+LOCATION '${huser}/dns/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_dendro (
 unix_tstamp BIGINT,
 dns_a STRING,
 dns_qry_name STRING,
@@ -49,10 +48,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/dendro';
+LOCATION '${huser}/dns/hive/oa/dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_edge (
 unix_tstamp BIGINT,
 frame_len BIGINT,
 ip_dst STRING,
@@ -74,10 +73,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/edge';
+LOCATION '${huser}/dns/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_ingest_summary (
 tdate STRING,
 total BIGINT
 )
@@ -87,10 +86,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/summary';
+LOCATION '${huser}/dns/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_scores (
 frame_time STRING, 
 unix_tstamp BIGINT,
 frame_len BIGINT,
@@ -114,10 +113,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/suspicious';
+LOCATION '${huser}/dns/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_storyboard (
 ip_threat STRING,
 dns_threat STRING, 
 title STRING,
@@ -129,10 +128,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/storyboard';
+LOCATION '${huser}/dns/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_threat_dendro (
 anchor STRING, 
 total BIGINT,
 dns_qry_name STRING, 
@@ -144,10 +143,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_dendro';
+LOCATION '${huser}/dns/hive/oa/threat_dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_threat_investigation (
 unix_tstamp BIGINT,
 ip_dst STRING, 
 dns_qry_name STRING, 
@@ -160,4 +159,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_investigation';
+LOCATION '${huser}/dns/hive/oa/threat_investigation';
diff --git a/spot-setup/create_flow_parquet.hql b/spot-setup/beeline/create_flow_parquet.hql
similarity index 75%
copy from spot-setup/create_flow_parquet.hql
copy to spot-setup/beeline/create_flow_parquet.hql
index 41c4819..25e860a 100755
--- a/spot-setup/create_flow_parquet.hql
+++ b/spot-setup/beeline/create_flow_parquet.hql
@@ -14,8 +14,7 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
-
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow (
 treceived STRING,
 unix_tstamp BIGINT,
 tryear INT,
@@ -52,10 +51,10 @@
 h TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive';
+LOCATION '${huser}/flow/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_chords (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_chords (
 ip_threat STRING,
 srcip STRING,
 dstip STRING,
@@ -68,10 +67,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/chords';
+LOCATION '${huser}/flow/hive/oa/chords';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_edge (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_edge (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -96,10 +95,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/edge';
+LOCATION '${huser}/flow/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_ingest_summary (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -109,10 +108,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/summary';
+LOCATION '${huser}/flow/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_scores (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_scores (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -140,10 +139,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/suspicious';
+LOCATION '${huser}/flow/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_storyboard (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_storyboard (
 ip_threat STRING,
 title STRING,
 text STRING
@@ -154,10 +153,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/storyboard';
+LOCATION '${huser}/flow/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_threat_investigation (
 tstart STRING,
 srcip STRING, 
 dstip STRING, 
@@ -171,10 +170,10 @@
 d TINYINT
 ) 
 STORED AS PARQUET 
-LOCATION '${var:huser}/flow/hive/oa/threat_investigation';
+LOCATION '${huser}/flow/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_timeline (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_timeline (
 ip_threat STRING,
 tstart STRING, 
 tend STRING, 
@@ -192,4 +191,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/timeline';
+LOCATION '${huser}/flow/hive/oa/timeline';
diff --git a/spot-setup/create_proxy_parquet.hql b/spot-setup/beeline/create_proxy_parquet.hql
similarity index 77%
copy from spot-setup/create_proxy_parquet.hql
copy to spot-setup/beeline/create_proxy_parquet.hql
index f665dc2..d9cd79f 100755
--- a/spot-setup/create_proxy_parquet.hql
+++ b/spot-setup/beeline/create_proxy_parquet.hql
@@ -14,8 +14,10 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+SET huser;
+SET dbname;
 
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy (
 p_date STRING,
 p_time STRING,
 clientip STRING,
@@ -52,10 +54,10 @@
 h STRING
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive';
+LOCATION '${huser}/proxy/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_edge (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -80,10 +82,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/edge';
+LOCATION '${huser}/proxy/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -93,10 +95,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/summary';
+LOCATION '${huser}/proxy/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_scores (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -128,10 +130,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/suspicious';
+LOCATION '${huser}/proxy/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_storyboard (
 p_threat STRING, 
 title STRING,
 text STRING
@@ -142,10 +144,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/storyboard';
+LOCATION '${huser}/proxy/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_threat_investigation (
 tdate STRING,
 fulluri STRING,
 uri_sev INT
@@ -156,10 +158,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/threat_investigation';
+LOCATION '${huser}/proxy/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_timeline ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_timeline (
 p_threat STRING, 
 tstart STRING, 
 tend STRING, 
@@ -174,4 +176,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/timeline';
+LOCATION '${huser}/proxy/hive/oa/timeline';
diff --git a/spot-setup/hdfs_setup.sh b/spot-setup/hdfs_setup.sh
index 86a26c0..13e8fcb 100755
--- a/spot-setup/hdfs_setup.sh
+++ b/spot-setup/hdfs_setup.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
@@ -17,6 +17,40 @@
 # limitations under the License.
 #
 
+set -e
+
+help() {
+    echo -n "
+   Initialize folders and databases for Spot in Hadoop.
+
+ Options:
+  --no-sudo         Do not use sudo with hdfs commands.
+  -c                Specify config file (default = /etc/spot.conf)
+  -d                Override databases
+  -h, --help        Display this help and exit
+" 
+exit 0
+}
+
+function log() {
+printf "hdfs_setup.sh:\\n %s\\n\\n" "$1"
+}
+
+function safe_mkdir() {
+        # takes the hdfs command options and a directory
+        # checks for the directory before trying to create it
+        # keeps the script from existing on existing folders
+        local hdfs_cmd=$1
+        local dir=$2
+        if hdfs dfs -test -d "${dir}"; then
+            log "${dir} already exists"
+        else
+            log "running mkdir on ${dir}"
+            ${hdfs_cmd} dfs -mkdir "${dir}"
+        fi
+}
+
+SPOTCONF="/etc/spot.conf"
 DSOURCES=('flow' 'dns' 'proxy')
 DFOLDERS=('binary' 
 'stage'
@@ -33,37 +67,117 @@
 'hive/oa/threat_dendro'
 )
 
+
+# input options
+for arg in "$@"; do
+    case $arg in
+        "--no-sudo")
+            log "not using sudo"
+            no_sudo=true
+            shift
+            ;;
+        "-c")
+            shift
+            SPOTCONF=$1
+            log "Spot Configuration file: ${SPOTCONF}"
+            shift
+            ;;
+        "-d")
+            shift
+            db_override=$1
+            shift
+            ;;
+        "-h"|"--help")
+            help
+            ;;
+    esac
+done
+
 # Sourcing spot configuration variables
-source /etc/spot.conf
+log "Sourcing ${SPOTCONF}"
+source "$SPOTCONF"
+
+if [[ ${no_sudo} == "true" ]]; then
+    hdfs_cmd="hdfs"
+
+    if [[ ! -z "${HADOOP_USER_NAME}" ]]; then
+        log "HADOOP_USER_NAME: ${HADOOP_USER_NAME}"
+    else
+        log "setting HADOOP_USER_NAME to hdfs"
+        HADOOP_USER_NAME=hdfs
+    fi
+else
+    hdfs_cmd="sudo -u hdfs hdfs"
+fi
+
+if [[ -z "${db_override}" ]]; then
+        DBENGINE=$(echo "${DBENGINE}" | tr '[:upper:]' '[:lower:]')
+        log "setting database engine to ${DBENGINE}"
+else
+        DBENGINE=$(echo "${db_override}" | tr '[:upper:]' '[:lower:]')
+        log "setting database engine to $db_override"
+fi
+
+case ${DBENGINE} in
+    impala)
+        db_shell="impala-shell -i ${IMPALA_DEM}"
+        if [[ ${KERBEROS} == "true" ]]; then
+            db_shell="${db_shell} -k"
+        fi
+        db_query="${db_shell} -q"
+        db_script="${db_shell} --var=huser=${HUSER} --var=dbname=${DBNAME} -c -f"
+        ;;
+    hive)
+        if [[ ${no_sudo} == "true" ]]; then
+            db_shell="hive"
+        else
+            db_shell="sudo -u hive hive"
+        fi
+        db_query="${db_shell} -e"
+        db_script="${db_shell} -hiveconf huser=${HUSER} -hiveconf dbname=${DBNAME} -f"
+        ;;
+    beeline)
+        db_shell="beeline -u jdbc:${JDBC_URL}"
+        db_query="${db_shell} -e"
+        db_script="${db_shell} --hivevar huser=${HUSER} --hivevar dbname=${DBNAME} -f"
+        ;;
+    *)
+        log "DBENGINE not compatible or not set in spot.conf: DBENGINE--> ${DBENGINE:-empty}"
+        exit 1
+        ;;
+esac
 
 # Creating HDFS user's folder
-sudo -u hdfs hdfs dfs -mkdir ${HUSER}
-sudo -u hdfs hdfs dfs -chown ${USER}:supergroup ${HUSER}
-sudo -u hdfs hdfs dfs -chmod 775 ${HUSER}
+safe_mkdir "${hdfs_cmd}" "${HUSER}"
+${hdfs_cmd} dfs -chown "${USER}":supergroup "${HUSER}"
+${hdfs_cmd} dfs -chmod 775 "${HUSER}"
 
 # Creating HDFS paths for each use case
 for d in "${DSOURCES[@]}" 
-do 
+do
 	echo "creating /$d"
-	hdfs dfs -mkdir ${HUSER}/$d 
+	safe_mkdir "${hdfs_cmd}" "${HUSER}/$d"
 	for f in "${DFOLDERS[@]}" 
 	do 
 		echo "creating $d/$f"
-		hdfs dfs -mkdir ${HUSER}/$d/$f
+		safe_mkdir "${hdfs_cmd}" "${HUSER}/$d/$f"
 	done
 
 	# Modifying permission on HDFS folders to allow Impala to read/write
-	hdfs dfs -chmod -R 775 ${HUSER}/$d
-	sudo -u hdfs hdfs dfs -setfacl -R -m user:impala:rwx ${HUSER}/$d
-	sudo -u hdfs hdfs dfs -setfacl -R -m user:${USER}:rwx ${HUSER}/$d
+	${hdfs_cmd} dfs -chmod -R 775 "${HUSER}"/"$d"
+	${hdfs_cmd} dfs -setfacl -R -m user:"${db_override}":rwx "${HUSER}"/"$d"
+	${hdfs_cmd} dfs -setfacl -R -m user:"${USER}":rwx "${HUSER}"/"$d"
 done
 
+
 # Creating Spot Database
-impala-shell -i ${IMPALA_DEM} -q "CREATE DATABASE IF NOT EXISTS ${DBNAME};"
+log "Creating Spot Database"
+${db_query} "CREATE DATABASE IF NOT EXISTS ${DBNAME}";
 
-# Creating Impala tables
+
+# Creating tables
+log "Creating Database tables"
 for d in "${DSOURCES[@]}" 
-do 
-	impala-shell -i ${IMPALA_DEM} --var=huser=${HUSER} --var=dbname=${DBNAME} -c -f create_${d}_parquet.hql
+do
+	${db_script} "./${DBENGINE}/create_${d}_parquet.hql"
 done
-
diff --git a/spot-setup/create_dns_parquet.hql b/spot-setup/hive/create_dns_parquet.hql
similarity index 70%
copy from spot-setup/create_dns_parquet.hql
copy to spot-setup/hive/create_dns_parquet.hql
index 38025c6..8e31ed3 100755
--- a/spot-setup/create_dns_parquet.hql
+++ b/spot-setup/hive/create_dns_parquet.hql
@@ -14,8 +14,10 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+SET hiveconf:huser;
+SET hiveconf:dbname;
 
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns (
 frame_time STRING, 
 unix_tstamp BIGINT,
 frame_len INT,
@@ -34,10 +36,10 @@
 h TINYINT
 )
 STORED AS PARQUET 
-LOCATION '${var:huser}/dns/hive';
+LOCATION '${hiveconf:huser}/dns/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_dendro (
 unix_tstamp BIGINT,
 dns_a STRING,
 dns_qry_name STRING,
@@ -49,10 +51,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/dendro';
+LOCATION '${hiveconf:huser}/dns/hive/oa/dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_edge (
 unix_tstamp BIGINT,
 frame_len BIGINT,
 ip_dst STRING,
@@ -74,10 +76,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/edge';
+LOCATION '${hiveconf:huser}/dns/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_ingest_summary (
 tdate STRING,
 total BIGINT
 )
@@ -87,10 +89,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/summary';
+LOCATION '${hiveconf:huser}/dns/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_scores (
 frame_time STRING, 
 unix_tstamp BIGINT,
 frame_len BIGINT,
@@ -114,10 +116,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/suspicious';
+LOCATION '${hiveconf:huser}/dns/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_storyboard (
 ip_threat STRING,
 dns_threat STRING, 
 title STRING,
@@ -129,10 +131,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/storyboard';
+LOCATION '${hiveconf:huser}/dns/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_threat_dendro (
 anchor STRING, 
 total BIGINT,
 dns_qry_name STRING, 
@@ -144,10 +146,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_dendro';
+LOCATION '${hiveconf:huser}/dns/hive/oa/threat_dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_threat_investigation (
 unix_tstamp BIGINT,
 ip_dst STRING, 
 dns_qry_name STRING, 
@@ -160,4 +162,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_investigation';
+LOCATION '${hiveconf:huser}/dns/hive/oa/threat_investigation';
diff --git a/spot-setup/create_flow_parquet.hql b/spot-setup/hive/create_flow_parquet.hql
similarity index 71%
copy from spot-setup/create_flow_parquet.hql
copy to spot-setup/hive/create_flow_parquet.hql
index 41c4819..034e194 100755
--- a/spot-setup/create_flow_parquet.hql
+++ b/spot-setup/hive/create_flow_parquet.hql
@@ -14,8 +14,10 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+SET hiveconf:huser;
+SET hiveconf:dbname;
 
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow (
 treceived STRING,
 unix_tstamp BIGINT,
 tryear INT,
@@ -52,10 +54,10 @@
 h TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive';
+LOCATION '${hiveconf:huser}/flow/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_chords (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_chords (
 ip_threat STRING,
 srcip STRING,
 dstip STRING,
@@ -68,10 +70,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/chords';
+LOCATION '${hiveconf:huser}/flow/hive/oa/chords';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_edge (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_edge (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -96,10 +98,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/edge';
+LOCATION '${hiveconf:huser}/flow/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_ingest_summary (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -109,10 +111,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/summary';
+LOCATION '${hiveconf:huser}/flow/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_scores (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_scores (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -140,10 +142,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/suspicious';
+LOCATION '${hiveconf:huser}/flow/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_storyboard (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_storyboard (
 ip_threat STRING,
 title STRING,
 text STRING
@@ -154,10 +156,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/storyboard';
+LOCATION '${hiveconf:huser}/flow/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_threat_investigation (
 tstart STRING,
 srcip STRING, 
 dstip STRING, 
@@ -171,10 +173,10 @@
 d TINYINT
 ) 
 STORED AS PARQUET 
-LOCATION '${var:huser}/flow/hive/oa/threat_investigation';
+LOCATION '${hiveconf:huser}/flow/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_timeline (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_timeline (
 ip_threat STRING,
 tstart STRING, 
 tend STRING, 
@@ -192,4 +194,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/timeline';
+LOCATION '${hiveconf:huser}/flow/hive/oa/timeline';
diff --git a/spot-setup/create_proxy_parquet.hql b/spot-setup/hive/create_proxy_parquet.hql
similarity index 74%
copy from spot-setup/create_proxy_parquet.hql
copy to spot-setup/hive/create_proxy_parquet.hql
index f665dc2..16d90c0 100755
--- a/spot-setup/create_proxy_parquet.hql
+++ b/spot-setup/hive/create_proxy_parquet.hql
@@ -14,8 +14,10 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+SET hiveconf:huser;
+SET hiveconf:dbname;
 
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy (
 p_date STRING,
 p_time STRING,
 clientip STRING,
@@ -52,10 +54,10 @@
 h STRING
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive';
+LOCATION '${hiveconf:huser}/proxy/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_edge (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -80,10 +82,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/edge';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -93,10 +95,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/summary';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_scores (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -128,10 +130,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/suspicious';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_storyboard (
 p_threat STRING, 
 title STRING,
 text STRING
@@ -142,10 +144,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/storyboard';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_threat_investigation (
 tdate STRING,
 fulluri STRING,
 uri_sev INT
@@ -156,10 +158,10 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/threat_investigation';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_timeline ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_timeline (
 p_threat STRING, 
 tstart STRING, 
 tend STRING, 
@@ -174,4 +176,4 @@
 d TINYINT
 )
 STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/timeline';
+LOCATION '${hiveconf:huser}/proxy/hive/oa/timeline';
diff --git a/spot-setup/create_dns_parquet.hql b/spot-setup/impala/create_dns_parquet.hql
similarity index 85%
rename from spot-setup/create_dns_parquet.hql
rename to spot-setup/impala/create_dns_parquet.hql
index 38025c6..274ea9d 100755
--- a/spot-setup/create_dns_parquet.hql
+++ b/spot-setup/impala/create_dns_parquet.hql
@@ -37,7 +37,7 @@
 LOCATION '${var:huser}/dns/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_dendro (
 unix_tstamp BIGINT,
 dns_a STRING,
 dns_qry_name STRING,
@@ -52,7 +52,7 @@
 LOCATION '${var:huser}/dns/hive/oa/dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_edge (
 unix_tstamp BIGINT,
 frame_len BIGINT,
 ip_dst STRING,
@@ -77,7 +77,7 @@
 LOCATION '${var:huser}/dns/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_ingest_summary (
 tdate STRING,
 total BIGINT
 )
@@ -90,7 +90,7 @@
 LOCATION '${var:huser}/dns/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_scores (
 frame_time STRING, 
 unix_tstamp BIGINT,
 frame_len BIGINT,
@@ -117,7 +117,7 @@
 LOCATION '${var:huser}/dns/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_storyboard (
 ip_threat STRING,
 dns_threat STRING, 
 title STRING,
@@ -132,7 +132,7 @@
 LOCATION '${var:huser}/dns/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_dendro (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_threat_dendro (
 anchor STRING, 
 total BIGINT,
 dns_qry_name STRING, 
@@ -147,7 +147,7 @@
 LOCATION '${var:huser}/dns/hive/oa/threat_dendro';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_threat_investigation (
 unix_tstamp BIGINT,
 ip_dst STRING, 
 dns_qry_name STRING, 
diff --git a/spot-setup/create_flow_parquet.hql b/spot-setup/impala/create_flow_parquet.hql
similarity index 86%
rename from spot-setup/create_flow_parquet.hql
rename to spot-setup/impala/create_flow_parquet.hql
index 41c4819..c8d3481 100755
--- a/spot-setup/create_flow_parquet.hql
+++ b/spot-setup/impala/create_flow_parquet.hql
@@ -55,7 +55,7 @@
 LOCATION '${var:huser}/flow/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_chords (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_chords (
 ip_threat STRING,
 srcip STRING,
 dstip STRING,
@@ -71,7 +71,7 @@
 LOCATION '${var:huser}/flow/hive/oa/chords';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_edge (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_edge (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -99,7 +99,7 @@
 LOCATION '${var:huser}/flow/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_ingest_summary (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -112,7 +112,7 @@
 LOCATION '${var:huser}/flow/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_scores (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_scores (
 tstart STRING, 
 srcip STRING,
 dstip STRING,
@@ -143,7 +143,7 @@
 LOCATION '${var:huser}/flow/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_storyboard (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_storyboard (
 ip_threat STRING,
 title STRING,
 text STRING
@@ -157,7 +157,7 @@
 LOCATION '${var:huser}/flow/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_threat_investigation (
 tstart STRING,
 srcip STRING, 
 dstip STRING, 
@@ -174,7 +174,7 @@
 LOCATION '${var:huser}/flow/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.flow_timeline (
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_timeline (
 ip_threat STRING,
 tstart STRING, 
 tend STRING, 
diff --git a/spot-setup/create_proxy_parquet.hql b/spot-setup/impala/create_proxy_parquet.hql
similarity index 87%
rename from spot-setup/create_proxy_parquet.hql
rename to spot-setup/impala/create_proxy_parquet.hql
index f665dc2..ddf3283 100755
--- a/spot-setup/create_proxy_parquet.hql
+++ b/spot-setup/impala/create_proxy_parquet.hql
@@ -55,7 +55,7 @@
 LOCATION '${var:huser}/proxy/hive';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_edge ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_edge (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -83,7 +83,7 @@
 LOCATION '${var:huser}/proxy/hive/oa/edge';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_ingest_summary ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_ingest_summary (
 tdate STRING,
 total BIGINT 
 )
@@ -96,7 +96,7 @@
 LOCATION '${var:huser}/proxy/hive/oa/summary';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_scores ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_scores (
 tdate STRING,
 time STRING, 
 clientip STRING, 
@@ -131,7 +131,7 @@
 LOCATION '${var:huser}/proxy/hive/oa/suspicious';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_storyboard ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_storyboard (
 p_threat STRING, 
 title STRING,
 text STRING
@@ -145,7 +145,7 @@
 LOCATION '${var:huser}/proxy/hive/oa/storyboard';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_threat_investigation ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_threat_investigation (
 tdate STRING,
 fulluri STRING,
 uri_sev INT
@@ -159,7 +159,7 @@
 LOCATION '${var:huser}/proxy/hive/oa/threat_investigation';
 
 
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_timeline ( 
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_timeline (
 p_threat STRING, 
 tstart STRING, 
 tend STRING, 
diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf
index a0cba3d..aa08ea7 100755
--- a/spot-setup/spot.conf
+++ b/spot-setup/spot.conf
@@ -19,7 +19,6 @@
 UINODE='node03'
 MLNODE='node04'
 GWNODE='node16'
-DBNAME='spot'
 
 #hdfs - base user and data source config
 HUSER='/user/spot'
@@ -30,10 +29,35 @@
 FLOW_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/
 HPATH=${HUSER}/${DSOURCE}/scored_results/${FDATE}
 
-#impala config
+# Database
+DBNAME='spot'
+DBENGINE="" # hive,impala and beeline supported
+JDBC_URL="" # example hive2://node01:10000/default;principal=hive/node01@REALM.COM
+
+# impala config
 IMPALA_DEM=node04
 IMPALA_PORT=21050
 
+# Hive Server2
+HS2_HOST=''
+HS2_PORT=''
+
+#kerberos config
+KERBEROS='false'
+KINIT=/usr/bin/kinit
+PRINCIPAL='user'
+KEYTAB='/opt/security/user.keytab'
+SASL_MECH='GSSAPI'
+SECURITY_PROTO='sasl_plaintext'
+KAFKA_SERVICE_NAME=''
+
+#ssl config
+SSL='false'
+SSL_VERIFY='true'
+CA_LOCATION=''
+CERT=''
+KEY=''
+
 #local fs base user and data source config
 LUSER='/home/spot'
 LPATH=${LUSER}/ml/${DSOURCE}/${FDATE}