blob: a49c8f9f19035f6c34aacec66c5fb800980bc3fc [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import csv
import os
import logging
from utils import Util
COL_RCODE = 'dns_qry_rcode'
COL_QTYPE = 'dns_qry_type'
COL_CLASS = 'dns_qry_class'
COL_PRESP = 'proxy_http_rcode'
class IanaTransform(object):
def __init__(self,config,logger=None):
self._logger = logging.getLogger('OA.IANA') if logger else Util.get_logger('OA.IANA',create_file=False)
if COL_CLASS in config:
self._qclass_file_path = config[COL_CLASS]
if COL_QTYPE in config:
self._qtype_file_path = config[COL_QTYPE]
if COL_RCODE in config:
self._rcode_file_path = config[COL_RCODE]
if COL_PRESP in config:
self._http_rcode_file_path = config[COL_PRESP]
self._qclass_dict = {}
self._qtype_dict = {}
self._rcode_dict = {}
self._http_rcode_dict = {}
self._init_dicts()
def _init_dicts(self):
if os.path.isfile(self._qclass_file_path):
with open(self._qclass_file_path, 'rb') as qclass_file:
csv_reader = csv.reader(qclass_file)
csv_reader.next()
qclass_rows = list(csv_reader)
d1 = dict([(x[0],x[2]) for x in qclass_rows])
d2 = dict([(x[1],x[2]) for x in qclass_rows])
self._qclass_dict.update(d1)
self._qclass_dict.update(d2)
if os.path.isfile(self._qtype_file_path):
with open(self._qtype_file_path, 'rb') as qtype_file:
csv_reader = csv.reader(qtype_file)
csv_reader.next()
qtype_rows = list(csv_reader)
self._qtype_dict = dict([(x[1],x[0]) for x in qtype_rows])
if os.path.isfile(self._rcode_file_path):
with open(self._rcode_file_path) as rcode_file:
csv_reader = csv.reader(rcode_file)
csv_reader.next()
rcode_rows = list(csv_reader)
self._rcode_dict = dict([(x[0],x[1]) for x in rcode_rows])
if os.path.isfile(self._http_rcode_file_path):
with open(self._http_rcode_file_path) as http_resp_code:
csv_reader = csv.reader(http_resp_code)
csv_reader.next()
presp_rows = list(csv_reader)
self._http_rcode_dict = dict([(x[0],x[1]) for x in presp_rows])
def get_name(self,key,column):
if column == COL_CLASS:
if key in self._qclass_dict:
return self._qclass_dict[key]
else:
return "Unknown ({0})".format(key)
if column == COL_QTYPE:
if key in self._qtype_dict:
return self._qtype_dict[key]
else:
return "Unknown ({0})".format(key)
if column == COL_RCODE:
if key in self._rcode_dict:
return self._rcode_dict[key]
else:
return "Unknown ({0})".format(key)
if column == COL_PRESP:
if key in self._http_rcode_dict:
return self._http_rcode_dict[key]
else:
return "Unknown ({0})".format(key)