blob: ba4c871d5d86739330ed1d30f0e0e5edaabb7706 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from elasticsearch import TransportError
from distill import es
class Brew (object):
"""
Brew supports basic CRUD operations and publishes the status
of Elasticsearch.
"""
@staticmethod
def get_status():
"""
Fetch the status of the underlying database instance.
:return: [bool] if connection to database instance has been established
"""
return es.ping(ignore=[400, 404])
@staticmethod
def get_applications():
"""
Fetch all the registered applications in Distill.
.. note:: Private indexes starting with a period are not included
in the result set
:return: [dict] dictionary of all registered applications and meta info
"""
doc = {}
query = {
"aggs": {
"count_by_type": {
"terms": {
"field": "_type",
"size": 100
}
}
}
}
try:
cluster_status = es.cat.indices(h=["index"], pri=False)
x = cluster_status.splitlines()
for idx in x:
idx = idx.rstrip()
# Ignore private indexes (like .kibana or .stout)
if idx[:1] != '.':
response = es.search(index=idx, body=query)
d = {}
for tag in response["aggregations"]["count_by_type"]["buckets"]:
d[tag['key']] = tag['doc_count']
doc[idx] = d
except TransportError as e:
doc['error'] = e.info
except Exception as e:
doc['error'] = str(e)
return doc
@staticmethod
def create(app):
"""
Register a new application in Distill
.. code-block:: bash
{
"application" : "xdata_v3",
"health" : "green",
"num_docs" : 0,
"status" : "open"
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of application and its meta information
"""
# ignore 400 cause by IndexAlreadyExistsException when creating index
res = es.indices.create(index=app, ignore=[400, 404])
doc = _get_cluster_status(app)
return doc
@staticmethod
def read(app, app_type=None):
"""
Fetch meta data associated with an application
.. code-block:: bash
Example:
{
"application" : "xdata_v3",
"health" : "green",
"num_docs" : "100",
"status" : "open"
"types" : {
"raw_logs" : {
"@timestamp" : "date",
"action" : "string",
"elementId" : "string"
},
"parsed" : {
"@timestamp" : "date",
"elementId_interval" : "string"
},
"graph" : {
"uniqueID" : "string",
"transition_count" : "long",
"p_value" : "float"
}
}
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of application and its meta information
"""
return _get_cluster_status(app, app_type=app_type)
@staticmethod
def update(app):
"""
.. todo::
Currently not implemented
"""
raise NotImplementedError()
@staticmethod
def delete(app):
"""
Technically closes the index so its content is not searchable.
.. code-block: bash
Example:
{
status: "Deleted index xdata_v3"
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] status message of the event
"""
return es.indices.close(index=app, ignore=[400, 404])
def _get_cluster_status(app, app_type=None):
"""
Return cluster status, index health, and document count as string
@todo figure out how to count individual documents stored
at an app_type (currently shows only index count)
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of index meta data including field names
"""
doc = {}
try:
cluster_status = es.cat.indices(index=app,
h=["health", "status", "docs.count"],
pri=True,
ignore=[400, 404])
v = str(cluster_status).split(" ")
m = ["health", "status", "num_docs"]
doc = dict(zip(m, v))
# Add back application
doc["application"] = app
except TransportError as e:
doc['error'] = e.info
except Exception as e:
doc['error'] = str(e)
doc['fields'] = _get_all_fields(app, app_type)
return doc
def _parse_mappings(app, app_type=None):
"""
.. todo:
Need to parse out result set that presents field list and type
"""
doc = {}
try:
mappings = es.indices.get_mapping(index=app,
doc_type=[app_type],
ignore=[400, 404])
# mappings = yaml.safe_load (json.ess (mappings))
# print json.dumps (mappings [app]["mappings"], indent=4,
# separators=(',', ': '))
ignore = ["properties", "format"]
except TransportError as e:
doc['error'] = e.info
except Exception as e:
doc['error'] = str(e)
return doc
def _get_all_fields(app, app_type=None):
"""
Retrieve all possible fields in an application
:param app: [string] application name (e.g. xdata_v3)
:param app_type: [string] application type (e.g. logs)
:return: [list] list of strings representing the fields names
"""
d = list()
query = {
"aggs": {
"fields": {
"terms": {
"field": "_field_names",
"size": 100
}
}
}
}
try:
response = es.search(index=app, doc_type=app_type, body=query)
for tag in response['aggregations']['fields']['buckets']:
d.append(tag['key'])
except TransportError as e:
d.append(str(e.info))
except Exception as e:
d.append(str(e))
return d